max bucket reducer and sibling reducer framework

This commit is contained in:
Colin Goodheart-Smithe 2015-03-17 16:36:37 -07:00
parent a824184bf2
commit e19d20b407
26 changed files with 1010 additions and 128 deletions

View File

@ -28,7 +28,9 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.FilterBuilder; import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.reducers.ReducerBuilder;
import org.elasticsearch.search.highlight.HighlightBuilder; import org.elasticsearch.search.highlight.HighlightBuilder;
import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortBuilder;
@ -162,9 +164,9 @@ public class PercolateRequestBuilder extends BroadcastOperationRequestBuilder<Pe
} }
/** /**
* Delegates to {@link PercolateSourceBuilder#addAggregation(AggregationBuilder)} * Delegates to {@link PercolateSourceBuilder#addAggregation(AbstractAggregationBuilder)}
*/ */
public PercolateRequestBuilder addAggregation(AggregationBuilder aggregationBuilder) { public PercolateRequestBuilder addAggregation(AbstractAggregationBuilder aggregationBuilder) {
sourceBuilder().addAggregation(aggregationBuilder); sourceBuilder().addAggregation(aggregationBuilder);
return this; return this;
} }

View File

@ -19,13 +19,18 @@
package org.elasticsearch.action.percolate; package org.elasticsearch.action.percolate;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse; import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.percolator.PercolateContext; import org.elasticsearch.percolator.PercolateContext;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.ReducerStreams;
import org.elasticsearch.search.aggregations.reducers.SiblingReducer;
import org.elasticsearch.search.highlight.HighlightField; import org.elasticsearch.search.highlight.HighlightField;
import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResult;
@ -51,6 +56,7 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
private int requestedSize; private int requestedSize;
private InternalAggregations aggregations; private InternalAggregations aggregations;
private List<SiblingReducer> reducers;
PercolateShardResponse() { PercolateShardResponse() {
hls = new ArrayList<>(); hls = new ArrayList<>();
@ -69,6 +75,7 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
if (result.aggregations() != null) { if (result.aggregations() != null) {
this.aggregations = (InternalAggregations) result.aggregations(); this.aggregations = (InternalAggregations) result.aggregations();
} }
this.reducers = result.reducers();
} }
} }
@ -112,6 +119,10 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
return aggregations; return aggregations;
} }
public List<SiblingReducer> reducers() {
return reducers;
}
public byte percolatorTypeId() { public byte percolatorTypeId() {
return percolatorTypeId; return percolatorTypeId;
} }
@ -144,6 +155,16 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
hls.add(fields); hls.add(fields);
} }
aggregations = InternalAggregations.readOptionalAggregations(in); aggregations = InternalAggregations.readOptionalAggregations(in);
if (in.readBoolean()) {
int reducersSize = in.readVInt();
List<SiblingReducer> reducers = new ArrayList<>(reducersSize);
for (int i = 0; i < reducersSize; i++) {
BytesReference type = in.readBytesReference();
Reducer reducer = ReducerStreams.stream(type).readResult(in);
reducers.add((SiblingReducer) reducer);
}
this.reducers = reducers;
}
} }
@Override @Override
@ -169,5 +190,15 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
} }
} }
out.writeOptionalStreamable(aggregations); out.writeOptionalStreamable(aggregations);
if (reducers == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeVInt(reducers.size());
for (Reducer reducer : reducers) {
out.writeBytesReference(reducer.type().stream());
reducer.writeTo(out);
}
}
} }
} }

View File

@ -29,6 +29,7 @@ import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.reducers.ReducerBuilder;
import org.elasticsearch.search.highlight.HighlightBuilder; import org.elasticsearch.search.highlight.HighlightBuilder;
import org.elasticsearch.search.sort.ScoreSortBuilder; import org.elasticsearch.search.sort.ScoreSortBuilder;
import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortBuilder;
@ -50,7 +51,7 @@ public class PercolateSourceBuilder implements ToXContent {
private List<SortBuilder> sorts; private List<SortBuilder> sorts;
private Boolean trackScores; private Boolean trackScores;
private HighlightBuilder highlightBuilder; private HighlightBuilder highlightBuilder;
private List<AggregationBuilder> aggregations; private List<AbstractAggregationBuilder> aggregations;
/** /**
* Sets the document to run the percolate queries against. * Sets the document to run the percolate queries against.
@ -130,7 +131,7 @@ public class PercolateSourceBuilder implements ToXContent {
/** /**
* Add an aggregation definition. * Add an aggregation definition.
*/ */
public PercolateSourceBuilder addAggregation(AggregationBuilder aggregationBuilder) { public PercolateSourceBuilder addAggregation(AbstractAggregationBuilder aggregationBuilder) {
if (aggregations == null) { if (aggregations == null) {
aggregations = Lists.newArrayList(); aggregations = Lists.newArrayList();
} }

View File

@ -33,6 +33,7 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.Scroll; import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.reducers.ReducerBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.innerhits.InnerHitsBuilder; import org.elasticsearch.search.fetch.innerhits.InnerHitsBuilder;
import org.elasticsearch.search.highlight.HighlightBuilder; import org.elasticsearch.search.highlight.HighlightBuilder;

View File

@ -19,11 +19,20 @@
package org.elasticsearch.percolator; package org.elasticsearch.percolator;
import com.carrotsearch.hppc.ByteObjectOpenHashMap; import com.carrotsearch.hppc.ByteObjectOpenHashMap;
import com.google.common.collect.Lists;
import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.ReaderUtil; import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.index.memory.ExtendedMemoryIndex; import org.apache.lucene.index.memory.ExtendedMemoryIndex;
import org.apache.lucene.index.memory.MemoryIndex; import org.apache.lucene.index.memory.MemoryIndex;
import org.apache.lucene.search.*; import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.FilteredQuery;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CloseableThreadLocal; import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
@ -58,20 +67,30 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues; import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.mapper.*; import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.percolator.stats.ShardPercolateService; import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.query.ParsedQuery; import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.search.nested.NonNestedDocsFilter; import org.elasticsearch.index.search.nested.NonNestedDocsFilter;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.percolator.QueryCollector.*; import org.elasticsearch.percolator.QueryCollector.Count;
import org.elasticsearch.percolator.QueryCollector.Match;
import org.elasticsearch.percolator.QueryCollector.MatchAndScore;
import org.elasticsearch.percolator.QueryCollector.MatchAndSort;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchParseElement; import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.AggregationPhase; import org.elasticsearch.search.aggregations.AggregationPhase;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.SiblingReducer;
import org.elasticsearch.search.highlight.HighlightField; import org.elasticsearch.search.highlight.HighlightField;
import org.elasticsearch.search.highlight.HighlightPhase; import org.elasticsearch.search.highlight.HighlightPhase;
import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext;
@ -83,7 +102,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.index.mapper.SourceToParse.source; import static org.elasticsearch.index.mapper.SourceToParse.source;
import static org.elasticsearch.percolator.QueryCollector.*; import static org.elasticsearch.percolator.QueryCollector.count;
import static org.elasticsearch.percolator.QueryCollector.match;
import static org.elasticsearch.percolator.QueryCollector.matchAndScore;
public class PercolatorService extends AbstractComponent { public class PercolatorService extends AbstractComponent {
@ -826,15 +847,29 @@ public class PercolatorService extends AbstractComponent {
return null; return null;
} }
InternalAggregations aggregations;
if (shardResults.size() == 1) { if (shardResults.size() == 1) {
return shardResults.get(0).aggregations(); aggregations = shardResults.get(0).aggregations();
} else {
List<InternalAggregations> aggregationsList = new ArrayList<>(shardResults.size());
for (PercolateShardResponse shardResult : shardResults) {
aggregationsList.add(shardResult.aggregations());
}
aggregations = InternalAggregations.reduce(aggregationsList, new ReduceContext(null, bigArrays, scriptService));
} }
if (aggregations != null) {
List<InternalAggregations> aggregationsList = new ArrayList<>(shardResults.size()); List<SiblingReducer> reducers = shardResults.get(0).reducers();
for (PercolateShardResponse shardResult : shardResults) { if (reducers != null) {
aggregationsList.add(shardResult.aggregations()); List<InternalAggregation> newAggs = new ArrayList<>(Lists.transform(aggregations.asList(), Reducer.AGGREGATION_TRANFORM_FUNCTION));
for (SiblingReducer reducer : reducers) {
InternalAggregation newAgg = reducer.doReduce(new InternalAggregations(newAggs), new ReduceContext(null, bigArrays,
scriptService));
newAggs.add(newAgg);
}
aggregations = new InternalAggregations(newAggs);
}
} }
return InternalAggregations.reduce(aggregationsList, new ReduceContext(null, bigArrays, scriptService)); return aggregations;
} }
} }

View File

@ -56,6 +56,7 @@ import org.elasticsearch.search.aggregations.metrics.sum.SumParser;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsParser; import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsParser;
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountParser; import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountParser;
import org.elasticsearch.search.aggregations.reducers.Reducer; import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.MaxBucketParser;
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser; import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser;
import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgParser; import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgParser;
import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelModule; import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelModule;
@ -103,6 +104,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{
aggParsers.add(ChildrenParser.class); aggParsers.add(ChildrenParser.class);
reducerParsers.add(DerivativeParser.class); reducerParsers.add(DerivativeParser.class);
reducerParsers.add(MaxBucketParser.class);
reducerParsers.add(MovAvgParser.class); reducerParsers.add(MovAvgParser.class);
} }

View File

@ -30,6 +30,8 @@ import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.search.SearchParseElement; import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.SearchPhase; import org.elasticsearch.search.SearchPhase;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregator; import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregator;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.SiblingReducer;
import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.query.QueryPhaseExecutionException; import org.elasticsearch.search.query.QueryPhaseExecutionException;
@ -74,8 +76,11 @@ public class AggregationPhase implements SearchPhase {
List<Aggregator> collectors = new ArrayList<>(); List<Aggregator> collectors = new ArrayList<>();
Aggregator[] aggregators; Aggregator[] aggregators;
List<Reducer> reducers;
try { try {
aggregators = context.aggregations().factories().createTopLevelAggregators(aggregationContext); AggregatorFactories factories = context.aggregations().factories();
aggregators = factories.createTopLevelAggregators(aggregationContext);
reducers = factories.createReducers();
} catch (IOException e) { } catch (IOException e) {
throw new AggregationInitializationException("Could not initialize aggregators", e); throw new AggregationInitializationException("Could not initialize aggregators", e);
} }
@ -136,6 +141,21 @@ public class AggregationPhase implements SearchPhase {
} }
} }
context.queryResult().aggregations(new InternalAggregations(aggregations)); context.queryResult().aggregations(new InternalAggregations(aggregations));
try {
List<Reducer> reducers = context.aggregations().factories().createReducers();
List<SiblingReducer> siblingReducers = new ArrayList<>(reducers.size());
for (Reducer reducer : reducers) {
if (reducer instanceof SiblingReducer) {
siblingReducers.add((SiblingReducer) reducer);
} else {
throw new AggregationExecutionException("Invalid reducer named [" + reducer.name() + "] of type ["
+ reducer.type().name() + "]. Only sibling reducers are allowed at the top level");
}
}
context.queryResult().reducers(siblingReducers);
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build top level reducers", e);
}
// disable aggregations so that they don't run on next pages in case of scrolling // disable aggregations so that they don't run on next pages in case of scrolling
context.aggregations(null); context.aggregations(null);

View File

@ -56,7 +56,7 @@ public class AggregatorFactories {
public List<Reducer> createReducers() throws IOException { public List<Reducer> createReducers() throws IOException {
List<Reducer> reducers = new ArrayList<>(); List<Reducer> reducers = new ArrayList<>();
for (ReducerFactory factory : this.reducerFactories) { for (ReducerFactory factory : this.reducerFactories) {
reducers.add(factory.create(null, null, false)); // NOCOMIT add context, parent etc. reducers.add(factory.create());
} }
return reducers; return reducers;
} }
@ -213,14 +213,18 @@ public class AggregatorFactories {
temporarilyMarked.add(factory); temporarilyMarked.add(factory);
String[] bucketsPaths = factory.getBucketsPaths(); String[] bucketsPaths = factory.getBucketsPaths();
for (String bucketsPath : bucketsPaths) { for (String bucketsPath : bucketsPaths) {
ReducerFactory matchingFactory = reducerFactoriesMap.get(bucketsPath); int aggSepIndex = bucketsPath.indexOf('>');
if (bucketsPath.equals("_count") || bucketsPath.equals("_key") || aggFactoryNames.contains(bucketsPath)) { String firstAggName = aggSepIndex == -1 ? bucketsPath : bucketsPath.substring(0, aggSepIndex);
if (bucketsPath.equals("_count") || bucketsPath.equals("_key") || aggFactoryNames.contains(firstAggName)) {
continue; continue;
} else if (matchingFactory != null) {
resolveReducerOrder(aggFactoryNames, reducerFactoriesMap, orderedReducers, unmarkedFactories, temporarilyMarked,
matchingFactory);
} else { } else {
throw new ElasticsearchIllegalStateException("No reducer found for path [" + bucketsPath + "]"); ReducerFactory matchingFactory = reducerFactoriesMap.get(firstAggName);
if (matchingFactory != null) {
resolveReducerOrder(aggFactoryNames, reducerFactoriesMap, orderedReducers, unmarkedFactories,
temporarilyMarked, matchingFactory);
} else {
throw new ElasticsearchIllegalStateException("No aggregation found for path [" + bucketsPath + "]");
}
} }
} }
unmarkedFactories.remove(factory); unmarkedFactories.remove(factory);

View File

@ -58,6 +58,8 @@ import org.elasticsearch.search.aggregations.metrics.sum.InternalSum;
import org.elasticsearch.search.aggregations.metrics.tophits.InternalTopHits; import org.elasticsearch.search.aggregations.metrics.tophits.InternalTopHits;
import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCount; import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCount;
import org.elasticsearch.search.aggregations.reducers.InternalSimpleValue; import org.elasticsearch.search.aggregations.reducers.InternalSimpleValue;
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.InternalBucketMetricValue;
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.MaxBucketReducer;
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeReducer; import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeReducer;
import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgReducer; import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgReducer;
import org.elasticsearch.search.aggregations.reducers.movavg.models.TransportMovAvgModelModule; import org.elasticsearch.search.aggregations.reducers.movavg.models.TransportMovAvgModelModule;
@ -106,10 +108,12 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM
InternalTopHits.registerStreams(); InternalTopHits.registerStreams();
InternalGeoBounds.registerStream(); InternalGeoBounds.registerStream();
InternalChildren.registerStream(); InternalChildren.registerStream();
InternalSimpleValue.registerStreams();
// Reducers // Reducers
DerivativeReducer.registerStreams(); DerivativeReducer.registerStreams();
InternalSimpleValue.registerStreams();
InternalBucketMetricValue.registerStreams();
MaxBucketReducer.registerStreams();
MovAvgReducer.registerStreams(); MovAvgReducer.registerStreams();
} }

View File

@ -25,8 +25,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchParseException; import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.InvalidAggregationPathException; import org.elasticsearch.search.aggregations.InvalidAggregationPathException;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser; import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser;
import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.search.aggregations.support.AggregationPath;
@ -143,10 +143,16 @@ public class BucketHelpers {
* @param gapPolicy The gap policy to apply if empty buckets are found * @param gapPolicy The gap policy to apply if empty buckets are found
* @return The value extracted from <code>bucket</code> found at <code>aggPath</code> * @return The value extracted from <code>bucket</code> found at <code>aggPath</code>
*/ */
public static Double resolveBucketValue(InternalHistogram<? extends InternalHistogram.Bucket> histo, InternalHistogram.Bucket bucket, public static Double resolveBucketValue(InternalMultiBucketAggregation<?, ? extends InternalMultiBucketAggregation.Bucket> agg,
String aggPath, GapPolicy gapPolicy) { InternalMultiBucketAggregation.Bucket bucket, String aggPath, GapPolicy gapPolicy) {
List<String> aggPathsList = AggregationPath.parse(aggPath).getPathElementsAsStringList();
return resolveBucketValue(agg, bucket, aggPathsList, gapPolicy);
}
public static Double resolveBucketValue(InternalMultiBucketAggregation<?, ? extends InternalMultiBucketAggregation.Bucket> agg,
InternalMultiBucketAggregation.Bucket bucket, List<String> aggPathsList, GapPolicy gapPolicy) {
try { try {
Object propertyValue = bucket.getProperty(histo.getName(), AggregationPath.parse(aggPath).getPathElementsAsStringList()); Object propertyValue = bucket.getProperty(agg.getName(), aggPathsList);
if (propertyValue == null) { if (propertyValue == null) {
throw new AggregationExecutionException(DerivativeParser.BUCKETS_PATH.getPreferredName() throw new AggregationExecutionException(DerivativeParser.BUCKETS_PATH.getPreferredName()
+ " must reference either a number value or a single value numeric metric aggregation"); + " must reference either a number value or a single value numeric metric aggregation");

View File

@ -19,11 +19,14 @@
package org.elasticsearch.search.aggregations.reducers; package org.elasticsearch.search.aggregations.reducers;
import com.google.common.base.Function;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.InternalAggregation.Type;
@ -66,6 +69,13 @@ public abstract class Reducer implements Streamable {
} }
public static final Function<Aggregation, InternalAggregation> AGGREGATION_TRANFORM_FUNCTION = new Function<Aggregation, InternalAggregation>() {
@Override
public InternalAggregation apply(Aggregation input) {
return (InternalAggregation) input;
}
};
private String name; private String name;
private String[] bucketsPaths; private String[] bucketsPaths;
private Map<String, Object> metaData; private Map<String, Object> metaData;

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.reducers;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
@ -28,10 +29,8 @@ import java.util.Map;
/** /**
* A base class for all reducer builders. * A base class for all reducer builders.
*/ */
public abstract class ReducerBuilder<B extends ReducerBuilder<B>> implements ToXContent { public abstract class ReducerBuilder<B extends ReducerBuilder<B>> extends AbstractAggregationBuilder {
private final String name;
protected final String type;
private String[] bucketsPaths; private String[] bucketsPaths;
private Map<String, Object> metaData; private Map<String, Object> metaData;
@ -39,15 +38,7 @@ public abstract class ReducerBuilder<B extends ReducerBuilder<B>> implements ToX
* Sole constructor, typically used by sub-classes. * Sole constructor, typically used by sub-classes.
*/ */
protected ReducerBuilder(String name, String type) { protected ReducerBuilder(String name, String type) {
this.name = name; super(name, type);
this.type = type;
}
/**
* Return the name of the reducer that is being built.
*/
public String getName() {
return name;
} }
/** /**

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations.reducers; package org.elasticsearch.search.aggregations.reducers;
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.MaxBucketBuilder;
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeBuilder; import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeBuilder;
import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgBuilder; import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgBuilder;
@ -31,6 +32,10 @@ public final class ReducerBuilders {
return new DerivativeBuilder(name); return new DerivativeBuilder(name);
} }
public static final MaxBucketBuilder maxBucket(String name) {
return new MaxBucketBuilder(name);
}
public static final MovAvgBuilder smooth(String name) { public static final MovAvgBuilder smooth(String name) {
return new MovAvgBuilder(name); return new MovAvgBuilder(name);
} }

View File

@ -20,7 +20,6 @@ package org.elasticsearch.search.aggregations.reducers;
import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
@ -62,8 +61,7 @@ public abstract class ReducerFactory {
doValidate(parent, factories, reducerFactories); doValidate(parent, factories, reducerFactories);
} }
protected abstract Reducer createInternal(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket, protected abstract Reducer createInternal(Map<String, Object> metaData) throws IOException;
Map<String, Object> metaData) throws IOException;
/** /**
* Creates the reducer * Creates the reducer
@ -81,8 +79,8 @@ public abstract class ReducerFactory {
* *
* @return The created aggregator * @return The created aggregator
*/ */
public final Reducer create(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket) throws IOException { public final Reducer create() throws IOException {
Reducer aggregator = createInternal(context, parent, collectsFromSingleBucket, this.metaData); Reducer aggregator = createInternal(this.metaData);
return aggregator; return aggregator;
} }

View File

@ -0,0 +1,65 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers;
import com.google.common.collect.Lists;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public abstract class SiblingReducer extends Reducer {
protected SiblingReducer() { // for Serialisation
super();
}
protected SiblingReducer(String name, String[] bucketsPaths, Map<String, Object> metaData) {
super(name, bucketsPaths, metaData);
}
@SuppressWarnings("unchecked")
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
@SuppressWarnings("rawtypes")
InternalMultiBucketAggregation multiBucketsAgg = (InternalMultiBucketAggregation) aggregation;
List<? extends Bucket> buckets = multiBucketsAgg.getBuckets();
List<Bucket> newBuckets = new ArrayList<>();
for (int i = 0; i < buckets.size(); i++) {
InternalMultiBucketAggregation.InternalBucket bucket = (InternalMultiBucketAggregation.InternalBucket) buckets.get(i);
InternalAggregation aggToAdd = doReduce(bucket.getAggregations(), reduceContext);
List<InternalAggregation> aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), AGGREGATION_TRANFORM_FUNCTION));
aggs.add(aggToAdd);
InternalMultiBucketAggregation.InternalBucket newBucket = multiBucketsAgg.createBucket(new InternalAggregations(aggs), bucket);
newBuckets.add(newBucket);
}
return multiBucketsAgg.create(newBuckets);
}
public abstract InternalAggregation doReduce(Aggregations aggregations, ReduceContext context);
}

View File

@ -0,0 +1,132 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.bucketmetrics;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class InternalBucketMetricValue extends InternalNumericMetricsAggregation.SingleValue {
public final static Type TYPE = new Type("bucket_metric_value");
public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalBucketMetricValue readResult(StreamInput in) throws IOException {
InternalBucketMetricValue result = new InternalBucketMetricValue();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
private double value;
private String[] keys;
protected InternalBucketMetricValue() {
super();
}
public InternalBucketMetricValue(String name, String[] keys, double value, @Nullable ValueFormatter formatter,
List<Reducer> reducers, Map<String, Object> metaData) {
super(name, reducers, metaData);
this.keys = keys;
this.value = value;
this.valueFormatter = formatter;
}
@Override
public Type type() {
return TYPE;
}
@Override
public double value() {
return value;
}
public String[] keys() {
return keys;
}
@Override
public InternalAggregation doReduce(ReduceContext reduceContext) {
throw new UnsupportedOperationException("Not supported");
}
@Override
public Object getProperty(List<String> path) {
if (path.isEmpty()) {
return this;
} else if (path.size() == 1 && "value".equals(path.get(0))) {
return value();
} else if (path.size() == 1 && "keys".equals(path.get(0))) {
return keys();
} else {
throw new ElasticsearchIllegalArgumentException("path not supported for [" + getName() + "]: " + path);
}
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
valueFormatter = ValueFormatterStreams.readOptional(in);
value = in.readDouble();
keys = in.readStringArray();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
ValueFormatterStreams.writeOptional(valueFormatter, out);
out.writeDouble(value);
out.writeStringArray(keys);
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
boolean hasValue = !Double.isInfinite(value);
builder.field(CommonFields.VALUE, hasValue ? value : null);
if (hasValue && valueFormatter != null) {
builder.field(CommonFields.VALUE_AS_STRING, valueFormatter.format(value));
}
builder.startArray("keys");
for (String key : keys) {
builder.value(key);
}
builder.endArray();
return builder;
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.bucketmetrics;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.reducers.ReducerBuilder;
import java.io.IOException;
public class MaxBucketBuilder extends ReducerBuilder<MaxBucketBuilder> {
private String format;
public MaxBucketBuilder(String name) {
super(name, MaxBucketReducer.TYPE.name());
}
public MaxBucketBuilder format(String format) {
this.format = format;
return this;
}
@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
if (format != null) {
builder.field(MaxBucketParser.FORMAT.getPreferredName(), format);
}
return builder;
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.bucketmetrics;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class MaxBucketParser implements Reducer.Parser {
public static final ParseField FORMAT = new ParseField("format");
@Override
public String type() {
return MaxBucketReducer.TYPE.name();
}
@Override
public ReducerFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException {
XContentParser.Token token;
String currentFieldName = null;
String[] bucketsPaths = null;
String format = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if (FORMAT.match(currentFieldName)) {
format = parser.text();
} else if (BUCKETS_PATH.match(currentFieldName)) {
bucketsPaths = new String[] { parser.text() };
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].");
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (BUCKETS_PATH.match(currentFieldName)) {
List<String> paths = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
String path = parser.text();
paths.add(path);
}
bucketsPaths = paths.toArray(new String[paths.size()]);
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].");
}
} else {
throw new SearchParseException(context, "Unexpected token " + token + " in [" + reducerName + "].");
}
}
if (bucketsPaths == null) {
throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName()
+ "] for derivative aggregation [" + reducerName + "]");
}
ValueFormatter formatter = null;
if (format != null) {
formatter = ValueFormat.Patternable.Number.format(format).formatter();
}
return new MaxBucketReducer.Factory(reducerName, bucketsPaths, formatter);
}
}

View File

@ -0,0 +1,144 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.bucketmetrics;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.reducers.BucketHelpers;
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
import org.elasticsearch.search.aggregations.reducers.ReducerStreams;
import org.elasticsearch.search.aggregations.reducers.SiblingReducer;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class MaxBucketReducer extends SiblingReducer {
public final static Type TYPE = new Type("max_bucket");
public final static ReducerStreams.Stream STREAM = new ReducerStreams.Stream() {
@Override
public MaxBucketReducer readResult(StreamInput in) throws IOException {
MaxBucketReducer result = new MaxBucketReducer();
result.readFrom(in);
return result;
}
};
private ValueFormatter formatter;
public static void registerStreams() {
ReducerStreams.registerStream(STREAM, TYPE.stream());
}
private MaxBucketReducer() {
}
protected MaxBucketReducer(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, Map<String, Object> metaData) {
super(name, bucketsPaths, metaData);
this.formatter = formatter;
}
@Override
public Type type() {
return TYPE;
}
public InternalAggregation doReduce(Aggregations aggregations, ReduceContext context) {
List<String> maxBucketKeys = new ArrayList<>();
double maxValue = Double.NEGATIVE_INFINITY;
List<String> bucketsPath = AggregationPath.parse(bucketsPaths()[0]).getPathElementsAsStringList();
for (Aggregation aggregation : aggregations) {
if (aggregation.getName().equals(bucketsPath.get(0))) {
bucketsPath = bucketsPath.subList(1, bucketsPath.size());
InternalMultiBucketAggregation multiBucketsAgg = (InternalMultiBucketAggregation) aggregation;
List<? extends Bucket> buckets = multiBucketsAgg.getBuckets();
for (int i = 0; i < buckets.size(); i++) {
Bucket bucket = buckets.get(i);
Double bucketValue = BucketHelpers.resolveBucketValue(multiBucketsAgg, bucket, bucketsPath, GapPolicy.IGNORE);
if (bucketValue != null) {
if (bucketValue > maxValue) {
maxBucketKeys.clear();
maxBucketKeys.add(bucket.getKeyAsString());
maxValue = bucketValue;
} else if (bucketValue.equals(maxValue)) {
maxBucketKeys.add(bucket.getKeyAsString());
}
}
}
}
}
String[] keys = maxBucketKeys.toArray(new String[maxBucketKeys.size()]);
return new InternalBucketMetricValue(name(), keys, maxValue, formatter, Collections.EMPTY_LIST, metaData());
}
@Override
public void doReadFrom(StreamInput in) throws IOException {
formatter = ValueFormatterStreams.readOptional(in);
}
@Override
public void doWriteTo(StreamOutput out) throws IOException {
ValueFormatterStreams.writeOptional(formatter, out);
}
public static class Factory extends ReducerFactory {
private final ValueFormatter formatter;
public Factory(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter) {
super(name, TYPE.name(), bucketsPaths);
this.formatter = formatter;
}
@Override
protected Reducer createInternal(Map<String, Object> metaData) throws IOException {
return new MaxBucketReducer(name, bucketsPaths, formatter, metaData);
}
@Override
public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories, List<ReducerFactory> reducerFactories) {
if (bucketsPaths.length != 1) {
throw new ElasticsearchIllegalStateException(Reducer.Parser.BUCKETS_PATH.getPreferredName()
+ " must contain a single entry for reducer [" + name + "]");
}
}
}
}

View File

@ -19,15 +19,12 @@
package org.elasticsearch.search.aggregations.reducers.derivative; package org.elasticsearch.search.aggregations.reducers.derivative;
import com.google.common.base.Function;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
@ -40,7 +37,6 @@ import org.elasticsearch.search.aggregations.reducers.InternalSimpleValue;
import org.elasticsearch.search.aggregations.reducers.Reducer; import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.ReducerFactory; import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
import org.elasticsearch.search.aggregations.reducers.ReducerStreams; import org.elasticsearch.search.aggregations.reducers.ReducerStreams;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
@ -68,13 +64,6 @@ public class DerivativeReducer extends Reducer {
ReducerStreams.registerStream(STREAM, TYPE.stream()); ReducerStreams.registerStream(STREAM, TYPE.stream());
} }
private static final Function<Aggregation, InternalAggregation> FUNCTION = new Function<Aggregation, InternalAggregation>() {
@Override
public InternalAggregation apply(Aggregation input) {
return (InternalAggregation) input;
}
};
private ValueFormatter formatter; private ValueFormatter formatter;
private GapPolicy gapPolicy; private GapPolicy gapPolicy;
@ -106,7 +95,7 @@ public class DerivativeReducer extends Reducer {
if (lastBucketValue != null) { if (lastBucketValue != null) {
double diff = thisBucketValue - lastBucketValue; double diff = thisBucketValue - lastBucketValue;
List<InternalAggregation> aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), FUNCTION)); List<InternalAggregation> aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), AGGREGATION_TRANFORM_FUNCTION));
aggs.add(new InternalSimpleValue(name(), diff, formatter, new ArrayList<Reducer>(), metaData())); aggs.add(new InternalSimpleValue(name(), diff, formatter, new ArrayList<Reducer>(), metaData()));
InternalHistogram.Bucket newBucket = factory.createBucket(bucket.getKey(), bucket.getDocCount(), new InternalAggregations( InternalHistogram.Bucket newBucket = factory.createBucket(bucket.getKey(), bucket.getDocCount(), new InternalAggregations(
aggs), bucket.getKeyed(), bucket.getFormatter()); aggs), bucket.getKeyed(), bucket.getFormatter());
@ -143,8 +132,7 @@ public class DerivativeReducer extends Reducer {
} }
@Override @Override
protected Reducer createInternal(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket, protected Reducer createInternal(Map<String, Object> metaData) throws IOException {
Map<String, Object> metaData) throws IOException {
return new DerivativeReducer(name, bucketsPaths, formatter, gapPolicy, metaData); return new DerivativeReducer(name, bucketsPaths, formatter, gapPolicy, metaData);
} }

View File

@ -22,19 +22,26 @@ package org.elasticsearch.search.aggregations.reducers.movavg;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.EvictingQueue; import com.google.common.collect.EvictingQueue;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.*; import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregator; import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregator;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.reducers.*; import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.reducers.InternalSimpleValue;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
import org.elasticsearch.search.aggregations.reducers.ReducerStreams;
import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModel; import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModel;
import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelStreams; import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelStreams;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
@ -43,7 +50,6 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.resolveBucketValue; import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.resolveBucketValue;
public class MovAvgReducer extends Reducer { public class MovAvgReducer extends Reducer {
@ -155,8 +161,7 @@ public class MovAvgReducer extends Reducer {
} }
@Override @Override
protected Reducer createInternal(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket, protected Reducer createInternal(Map<String, Object> metaData) throws IOException {
Map<String, Object> metaData) throws IOException {
return new MovAvgReducer(name, bucketsPaths, formatter, gapPolicy, window, model, metaData); return new MovAvgReducer(name, bucketsPaths, formatter, gapPolicy, window, model, metaData);
} }

View File

@ -23,6 +23,7 @@ import com.carrotsearch.hppc.ObjectFloatOpenHashMap;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchGenerationException; import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
@ -38,6 +39,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.FilterBuilder; import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.reducers.ReducerBuilder;
import org.elasticsearch.search.fetch.innerhits.InnerHitsBuilder; import org.elasticsearch.search.fetch.innerhits.InnerHitsBuilder;
import org.elasticsearch.search.fetch.source.FetchSourceContext; import org.elasticsearch.search.fetch.source.FetchSourceContext;
import org.elasticsearch.search.highlight.HighlightBuilder; import org.elasticsearch.search.highlight.HighlightBuilder;
@ -55,8 +57,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
* A search source builder allowing to easily build search source. Simple construction * A search source builder allowing to easily build search source. Simple
* using {@link org.elasticsearch.search.builder.SearchSourceBuilder#searchSource()}. * construction using
* {@link org.elasticsearch.search.builder.SearchSourceBuilder#searchSource()}.
* *
* @see org.elasticsearch.action.search.SearchRequest#source(SearchSourceBuilder) * @see org.elasticsearch.action.search.SearchRequest#source(SearchSourceBuilder)
*/ */
@ -109,7 +112,6 @@ public class SearchSourceBuilder implements ToXContent {
private List<AbstractAggregationBuilder> aggregations; private List<AbstractAggregationBuilder> aggregations;
private BytesReference aggregationsBinary; private BytesReference aggregationsBinary;
private HighlightBuilder highlightBuilder; private HighlightBuilder highlightBuilder;
private SuggestBuilder suggestBuilder; private SuggestBuilder suggestBuilder;
@ -123,7 +125,6 @@ public class SearchSourceBuilder implements ToXContent {
private String[] stats; private String[] stats;
/** /**
* Constructs a new search source builder. * Constructs a new search source builder.
*/ */
@ -190,8 +191,9 @@ public class SearchSourceBuilder implements ToXContent {
} }
/** /**
* Sets a filter that will be executed after the query has been executed and only has affect on the search hits * Sets a filter that will be executed after the query has been executed and
* (not aggregations). This filter is always executed as last filtering mechanism. * only has affect on the search hits (not aggregations). This filter is
* always executed as last filtering mechanism.
*/ */
public SearchSourceBuilder postFilter(FilterBuilder postFilter) { public SearchSourceBuilder postFilter(FilterBuilder postFilter) {
this.postFilterBuilder = postFilter; this.postFilterBuilder = postFilter;
@ -276,8 +278,8 @@ public class SearchSourceBuilder implements ToXContent {
} }
/** /**
* Should each {@link org.elasticsearch.search.SearchHit} be returned with an * Should each {@link org.elasticsearch.search.SearchHit} be returned with
* explanation of the hit (ranking). * an explanation of the hit (ranking).
*/ */
public SearchSourceBuilder explain(Boolean explain) { public SearchSourceBuilder explain(Boolean explain) {
this.explain = explain; this.explain = explain;
@ -285,8 +287,8 @@ public class SearchSourceBuilder implements ToXContent {
} }
/** /**
* Should each {@link org.elasticsearch.search.SearchHit} be returned with a version * Should each {@link org.elasticsearch.search.SearchHit} be returned with a
* associated with it. * version associated with it.
*/ */
public SearchSourceBuilder version(Boolean version) { public SearchSourceBuilder version(Boolean version) {
this.version = version; this.version = version;
@ -310,21 +312,24 @@ public class SearchSourceBuilder implements ToXContent {
} }
/** /**
* An optional terminate_after to terminate the search after * An optional terminate_after to terminate the search after collecting
* collecting <code>terminateAfter</code> documents * <code>terminateAfter</code> documents
*/ */
public SearchSourceBuilder terminateAfter(int terminateAfter) { public SearchSourceBuilder terminateAfter(int terminateAfter) {
if (terminateAfter <= 0) { if (terminateAfter <= 0) {
throw new ElasticsearchIllegalArgumentException("terminateAfter must be > 0"); throw new ElasticsearchIllegalArgumentException("terminateAfter must be > 0");
} }
this.terminateAfter = terminateAfter; this.terminateAfter = terminateAfter;
return this; return this;
} }
/** /**
* Adds a sort against the given field name and the sort ordering. * Adds a sort against the given field name and the sort ordering.
* *
* @param name The name of the field * @param name
* @param order The sort ordering * The name of the field
* @param order
* The sort ordering
*/ */
public SearchSourceBuilder sort(String name, SortOrder order) { public SearchSourceBuilder sort(String name, SortOrder order) {
return sort(SortBuilders.fieldSort(name).order(order)); return sort(SortBuilders.fieldSort(name).order(order));
@ -333,7 +338,8 @@ public class SearchSourceBuilder implements ToXContent {
/** /**
* Add a sort against the given field name. * Add a sort against the given field name.
* *
* @param name The name of the field to sort by * @param name
* The name of the field to sort by
*/ */
public SearchSourceBuilder sort(String name) { public SearchSourceBuilder sort(String name) {
return sort(SortBuilders.fieldSort(name)); return sort(SortBuilders.fieldSort(name));
@ -351,8 +357,8 @@ public class SearchSourceBuilder implements ToXContent {
} }
/** /**
* Applies when sorting, and controls if scores will be tracked as well. Defaults to * Applies when sorting, and controls if scores will be tracked as well.
* <tt>false</tt>. * Defaults to <tt>false</tt>.
*/ */
public SearchSourceBuilder trackScores(boolean trackScores) { public SearchSourceBuilder trackScores(boolean trackScores) {
this.trackScores = trackScores; this.trackScores = trackScores;
@ -401,6 +407,7 @@ public class SearchSourceBuilder implements ToXContent {
/** /**
* Set the rescore window size for rescores that don't specify their window. * Set the rescore window size for rescores that don't specify their window.
*
* @param defaultRescoreWindowSize * @param defaultRescoreWindowSize
* @return * @return
*/ */
@ -465,7 +472,8 @@ public class SearchSourceBuilder implements ToXContent {
} }
/** /**
* Indicates whether the response should contain the stored _source for every hit * Indicates whether the response should contain the stored _source for
* every hit
* *
* @param fetch * @param fetch
* @return * @return
@ -480,22 +488,33 @@ public class SearchSourceBuilder implements ToXContent {
} }
/** /**
* Indicate that _source should be returned with every hit, with an "include" and/or "exclude" set which can include simple wildcard * Indicate that _source should be returned with every hit, with an
* "include" and/or "exclude" set which can include simple wildcard
* elements. * elements.
* *
* @param include An optional include (optionally wildcarded) pattern to filter the returned _source * @param include
* @param exclude An optional exclude (optionally wildcarded) pattern to filter the returned _source * An optional include (optionally wildcarded) pattern to filter
* the returned _source
* @param exclude
* An optional exclude (optionally wildcarded) pattern to filter
* the returned _source
*/ */
public SearchSourceBuilder fetchSource(@Nullable String include, @Nullable String exclude) { public SearchSourceBuilder fetchSource(@Nullable String include, @Nullable String exclude) {
return fetchSource(include == null ? Strings.EMPTY_ARRAY : new String[]{include}, exclude == null ? Strings.EMPTY_ARRAY : new String[]{exclude}); return fetchSource(include == null ? Strings.EMPTY_ARRAY : new String[] { include }, exclude == null ? Strings.EMPTY_ARRAY
: new String[] { exclude });
} }
/** /**
* Indicate that _source should be returned with every hit, with an "include" and/or "exclude" set which can include simple wildcard * Indicate that _source should be returned with every hit, with an
* "include" and/or "exclude" set which can include simple wildcard
* elements. * elements.
* *
* @param includes An optional list of include (optionally wildcarded) pattern to filter the returned _source * @param includes
* @param excludes An optional list of exclude (optionally wildcarded) pattern to filter the returned _source * An optional list of include (optionally wildcarded) pattern to
* filter the returned _source
* @param excludes
* An optional list of exclude (optionally wildcarded) pattern to
* filter the returned _source
*/ */
public SearchSourceBuilder fetchSource(@Nullable String[] includes, @Nullable String[] excludes) { public SearchSourceBuilder fetchSource(@Nullable String[] includes, @Nullable String[] excludes) {
fetchSourceContext = new FetchSourceContext(includes, excludes); fetchSourceContext = new FetchSourceContext(includes, excludes);
@ -511,7 +530,8 @@ public class SearchSourceBuilder implements ToXContent {
} }
/** /**
* Sets no fields to be loaded, resulting in only id and type to be returned per field. * Sets no fields to be loaded, resulting in only id and type to be returned
* per field.
*/ */
public SearchSourceBuilder noFields() { public SearchSourceBuilder noFields() {
this.fieldNames = ImmutableList.of(); this.fieldNames = ImmutableList.of();
@ -519,8 +539,8 @@ public class SearchSourceBuilder implements ToXContent {
} }
/** /**
* Sets the fields to load and return as part of the search request. If none are specified, * Sets the fields to load and return as part of the search request. If none
* the source of the document will be returned. * are specified, the source of the document will be returned.
*/ */
public SearchSourceBuilder fields(List<String> fields) { public SearchSourceBuilder fields(List<String> fields) {
this.fieldNames = fields; this.fieldNames = fields;
@ -528,8 +548,8 @@ public class SearchSourceBuilder implements ToXContent {
} }
/** /**
* Adds the fields to load and return as part of the search request. If none are specified, * Adds the fields to load and return as part of the search request. If none
* the source of the document will be returned. * are specified, the source of the document will be returned.
*/ */
public SearchSourceBuilder fields(String... fields) { public SearchSourceBuilder fields(String... fields) {
if (fieldNames == null) { if (fieldNames == null) {
@ -542,8 +562,9 @@ public class SearchSourceBuilder implements ToXContent {
} }
/** /**
* Adds a field to load and return (note, it must be stored) as part of the search request. * Adds a field to load and return (note, it must be stored) as part of the
* If none are specified, the source of the document will be return. * search request. If none are specified, the source of the document will be
* return.
*/ */
public SearchSourceBuilder field(String name) { public SearchSourceBuilder field(String name) {
if (fieldNames == null) { if (fieldNames == null) {
@ -554,7 +575,8 @@ public class SearchSourceBuilder implements ToXContent {
} }
/** /**
* Adds a field to load from the field data cache and return as part of the search request. * Adds a field to load from the field data cache and return as part of the
* search request.
*/ */
public SearchSourceBuilder fieldDataField(String name) { public SearchSourceBuilder fieldDataField(String name) {
if (fieldDataFields == null) { if (fieldDataFields == null) {
@ -567,8 +589,10 @@ public class SearchSourceBuilder implements ToXContent {
/** /**
* Adds a script field under the given name with the provided script. * Adds a script field under the given name with the provided script.
* *
* @param name The name of the field * @param name
* @param script The script * The name of the field
* @param script
* The script
*/ */
public SearchSourceBuilder scriptField(String name, String script) { public SearchSourceBuilder scriptField(String name, String script) {
return scriptField(name, null, script, null); return scriptField(name, null, script, null);
@ -577,9 +601,12 @@ public class SearchSourceBuilder implements ToXContent {
/** /**
* Adds a script field. * Adds a script field.
* *
* @param name The name of the field * @param name
* @param script The script to execute * The name of the field
* @param params The script parameters * @param script
* The script to execute
* @param params
* The script parameters
*/ */
public SearchSourceBuilder scriptField(String name, String script, Map<String, Object> params) { public SearchSourceBuilder scriptField(String name, String script, Map<String, Object> params) {
return scriptField(name, null, script, params); return scriptField(name, null, script, params);
@ -588,10 +615,14 @@ public class SearchSourceBuilder implements ToXContent {
/** /**
* Adds a script field. * Adds a script field.
* *
* @param name The name of the field * @param name
* @param lang The language of the script * The name of the field
* @param script The script to execute * @param lang
* @param params The script parameters (can be <tt>null</tt>) * The language of the script
* @param script
* The script to execute
* @param params
* The script parameters (can be <tt>null</tt>)
*/ */
public SearchSourceBuilder scriptField(String name, String lang, String script, Map<String, Object> params) { public SearchSourceBuilder scriptField(String name, String lang, String script, Map<String, Object> params) {
if (scriptFields == null) { if (scriptFields == null) {
@ -602,10 +633,13 @@ public class SearchSourceBuilder implements ToXContent {
} }
/** /**
* Sets the boost a specific index will receive when the query is executeed against it. * Sets the boost a specific index will receive when the query is executeed
* against it.
* *
* @param index The index to apply the boost against * @param index
* @param indexBoost The boost to apply to the index * The index to apply the boost against
* @param indexBoost
* The boost to apply to the index
*/ */
public SearchSourceBuilder indexBoost(String index, float indexBoost) { public SearchSourceBuilder indexBoost(String index, float indexBoost) {
if (this.indexBoost == null) { if (this.indexBoost == null) {
@ -648,7 +682,6 @@ public class SearchSourceBuilder implements ToXContent {
} }
} }
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
@ -657,7 +690,7 @@ public class SearchSourceBuilder implements ToXContent {
return builder; return builder;
} }
public void innerToXContent(XContentBuilder builder, Params params) throws IOException{ public void innerToXContent(XContentBuilder builder, Params params) throws IOException {
if (from != -1) { if (from != -1) {
builder.field("from", from); builder.field("from", from);
} }
@ -899,8 +932,8 @@ public class SearchSourceBuilder implements ToXContent {
private PartialField(String name, String include, String exclude) { private PartialField(String name, String include, String exclude) {
this.name = name; this.name = name;
this.includes = include == null ? null : new String[]{include}; this.includes = include == null ? null : new String[] { include };
this.excludes = exclude == null ? null : new String[]{exclude}; this.excludes = exclude == null ? null : new String[] { exclude };
} }
public String name() { public String name() {

View File

@ -21,9 +21,17 @@ package org.elasticsearch.search.controller;
import com.carrotsearch.hppc.IntArrayList; import com.carrotsearch.hppc.IntArrayList;
import com.carrotsearch.hppc.ObjectObjectOpenHashMap; import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
import com.google.common.collect.Lists;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.search.*; import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermStatistics;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.collect.HppcMaps; import org.elasticsearch.common.collect.HppcMaps;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
@ -33,8 +41,11 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.SiblingReducer;
import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.FetchSearchResult;
@ -47,7 +58,12 @@ import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.Suggest;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** /**
* *
@ -391,6 +407,19 @@ public class SearchPhaseController extends AbstractComponent {
} }
} }
if (aggregations != null) {
List<SiblingReducer> reducers = firstResult.reducers();
if (reducers != null) {
List<InternalAggregation> newAggs = new ArrayList<>(Lists.transform(aggregations.asList(), Reducer.AGGREGATION_TRANFORM_FUNCTION));
for (SiblingReducer reducer : reducers) {
InternalAggregation newAgg = reducer.doReduce(new InternalAggregations(newAggs), new ReduceContext(null, bigArrays,
scriptService));
newAggs.add(newAgg);
}
aggregations = new InternalAggregations(newAggs);
}
}
InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits, maxScore); InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits, maxScore);
return new InternalSearchResponse(searchHits, aggregations, suggest, timedOut, terminatedEarly); return new InternalSearchResponse(searchHits, aggregations, suggest, timedOut, terminatedEarly);

View File

@ -20,15 +20,20 @@
package org.elasticsearch.search.query; package org.elasticsearch.search.query;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.elasticsearch.Version; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.ReducerStreams;
import org.elasticsearch.search.aggregations.reducers.SiblingReducer;
import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.Suggest;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.common.lucene.Lucene.readTopDocs; import static org.elasticsearch.common.lucene.Lucene.readTopDocs;
import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; import static org.elasticsearch.common.lucene.Lucene.writeTopDocs;
@ -44,6 +49,7 @@ public class QuerySearchResult extends QuerySearchResultProvider {
private int size; private int size;
private TopDocs topDocs; private TopDocs topDocs;
private InternalAggregations aggregations; private InternalAggregations aggregations;
private List<SiblingReducer> reducers;
private Suggest suggest; private Suggest suggest;
private boolean searchTimedOut; private boolean searchTimedOut;
private Boolean terminatedEarly = null; private Boolean terminatedEarly = null;
@ -114,6 +120,14 @@ public class QuerySearchResult extends QuerySearchResultProvider {
this.aggregations = aggregations; this.aggregations = aggregations;
} }
public List<SiblingReducer> reducers() {
return reducers;
}
public void reducers(List<SiblingReducer> reducers) {
this.reducers = reducers;
}
public Suggest suggest() { public Suggest suggest() {
return suggest; return suggest;
} }
@ -162,6 +176,16 @@ public class QuerySearchResult extends QuerySearchResultProvider {
if (in.readBoolean()) { if (in.readBoolean()) {
aggregations = InternalAggregations.readAggregations(in); aggregations = InternalAggregations.readAggregations(in);
} }
if (in.readBoolean()) {
int size = in.readVInt();
List<SiblingReducer> reducers = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
BytesReference type = in.readBytesReference();
Reducer reducer = ReducerStreams.stream(type).readResult(in);
reducers.add((SiblingReducer) reducer);
}
this.reducers = reducers;
}
if (in.readBoolean()) { if (in.readBoolean()) {
suggest = Suggest.readSuggest(Suggest.Fields.SUGGEST, in); suggest = Suggest.readSuggest(Suggest.Fields.SUGGEST, in);
} }
@ -187,6 +211,16 @@ public class QuerySearchResult extends QuerySearchResultProvider {
out.writeBoolean(true); out.writeBoolean(true);
aggregations.writeTo(out); aggregations.writeTo(out);
} }
if (reducers == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeVInt(reducers.size());
for (Reducer reducer : reducers) {
out.writeBytesReference(reducer.type().stream());
reducer.writeTo(out);
}
}
if (suggest == null) { if (suggest == null) {
out.writeBoolean(false); out.writeBoolean(false);
} else { } else {

View File

@ -23,8 +23,11 @@ import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode; import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.reducers.ReducerBuilders;
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.InternalBucketMetricValue;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test; import org.junit.Test;
@ -40,6 +43,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertMatc
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
/** /**
* *
@ -111,6 +115,81 @@ public class PercolatorFacetsAndAggregationsTests extends ElasticsearchIntegrati
} }
} }
@Test
// Just test the integration with facets and aggregations, not the facet and aggregation functionality!
public void testAggregationsAndReducers() throws Exception {
assertAcked(prepareCreate("test").addMapping("type", "field1", "type=string", "field2", "type=string"));
ensureGreen();
int numQueries = scaledRandomIntBetween(250, 500);
int numUniqueQueries = between(1, numQueries / 2);
String[] values = new String[numUniqueQueries];
for (int i = 0; i < values.length; i++) {
values[i] = "value" + i;
}
int[] expectedCount = new int[numUniqueQueries];
logger.info("--> registering {} queries", numQueries);
for (int i = 0; i < numQueries; i++) {
String value = values[i % numUniqueQueries];
expectedCount[i % numUniqueQueries]++;
QueryBuilder queryBuilder = matchQuery("field1", value);
client().prepareIndex("test", PercolatorService.TYPE_NAME, Integer.toString(i))
.setSource(jsonBuilder().startObject().field("query", queryBuilder).field("field2", "b").endObject())
.execute().actionGet();
}
client().admin().indices().prepareRefresh("test").execute().actionGet();
for (int i = 0; i < numQueries; i++) {
String value = values[i % numUniqueQueries];
PercolateRequestBuilder percolateRequestBuilder = client().preparePercolate()
.setIndices("test").setDocumentType("type")
.setPercolateDoc(docBuilder().setDoc(jsonBuilder().startObject().field("field1", value).endObject()));
SubAggCollectionMode aggCollectionMode = randomFrom(SubAggCollectionMode.values());
percolateRequestBuilder.addAggregation(AggregationBuilders.terms("a").field("field2")
.collectMode(aggCollectionMode ));
if (randomBoolean()) {
percolateRequestBuilder.setPercolateQuery(matchAllQuery());
}
if (randomBoolean()) {
percolateRequestBuilder.setScore(true);
} else {
percolateRequestBuilder.setSortByScore(true).setSize(numQueries);
}
boolean countOnly = randomBoolean();
if (countOnly) {
percolateRequestBuilder.setOnlyCount(countOnly);
}
percolateRequestBuilder.addAggregation(ReducerBuilders.maxBucket("max_a").setBucketsPaths("a>_count"));
PercolateResponse response = percolateRequestBuilder.execute().actionGet();
assertMatchCount(response, expectedCount[i % numUniqueQueries]);
if (!countOnly) {
assertThat(response.getMatches(), arrayWithSize(expectedCount[i % numUniqueQueries]));
}
Aggregations aggregations = response.getAggregations();
assertThat(aggregations.asList().size(), equalTo(2));
Terms terms = aggregations.get("a");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("a"));
List<Terms.Bucket> buckets = new ArrayList<>(terms.getBuckets());
assertThat(buckets.size(), equalTo(1));
assertThat(buckets.get(0).getKeyAsString(), equalTo("b"));
assertThat(buckets.get(0).getDocCount(), equalTo((long) expectedCount[i % values.length]));
InternalBucketMetricValue maxA = aggregations.get("max_a");
assertThat(maxA, notNullValue());
assertThat(maxA.getName(), equalTo("max_a"));
assertThat(maxA.value(), equalTo((double) expectedCount[i % values.length]));
assertThat(maxA.keys(), equalTo(new String[] {"b"}));
}
}
@Test @Test
public void testSignificantAggs() throws Exception { public void testSignificantAggs() throws Exception {
client().admin().indices().prepareCreate("test").execute().actionGet(); client().admin().indices().prepareCreate("test").execute().actionGet();

View File

@ -0,0 +1,123 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.InternalBucketMetricValue;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.elasticsearch.search.aggregations.reducers.ReducerBuilders.maxBucket;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.IsNull.notNullValue;
@ElasticsearchIntegrationTest.SuiteScopeTest
public class MaxBucketTests extends ElasticsearchIntegrationTest {
private static final String SINGLE_VALUED_FIELD_NAME = "l_value";
static int numDocs;
static int interval;
static int minRandomValue;
static int maxRandomValue;
static int numValueBuckets;
static long[] valueCounts;
@Override
public void setupSuiteScopeCluster() throws Exception {
createIndex("idx");
createIndex("idx_unmapped");
numDocs = randomIntBetween(6, 20);
interval = randomIntBetween(2, 5);
minRandomValue = 0;
maxRandomValue = 20;
numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1;
valueCounts = new long[numValueBuckets];
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
int fieldValue = randomIntBetween(minRandomValue, maxRandomValue);
builders.add(client().prepareIndex("idx", "type").setSource(
jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, fieldValue).field("tag", "tag" + i).endObject()));
final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1);
valueCounts[bucket]++;
}
assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer"));
for (int i = 0; i < 2; i++) {
builders.add(client().prepareIndex("empty_bucket_idx", "type", "" + i).setSource(
jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject()));
}
indexRandom(true, builders);
ensureSearchable();
}
@Test
public void singleValuedField() throws Exception {
SearchResponse response = client().prepareSearch("idx")
.addAggregation(histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0)
.extendedBounds((long) minRandomValue, (long) maxRandomValue))
.addAggregation(maxBucket("max_bucket").setBucketsPaths("histo>_count")).execute().actionGet();
assertSearchResponse(response);
Histogram histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
assertThat(buckets.size(), equalTo(numValueBuckets));
List<String> maxKeys = new ArrayList<>();
double maxValue = Double.NEGATIVE_INFINITY;
for (int i = 0; i < numValueBuckets; ++i) {
Histogram.Bucket bucket = buckets.get(i);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval));
assertThat(bucket.getDocCount(), equalTo(valueCounts[i]));
if (bucket.getDocCount() > maxValue) {
maxValue = bucket.getDocCount();
maxKeys = new ArrayList<>();
maxKeys.add(bucket.getKeyAsString());
} else if (bucket.getDocCount() == maxValue) {
maxKeys.add(bucket.getKeyAsString());
}
}
InternalBucketMetricValue maxBucketValue = response.getAggregations().get("max_bucket");
assertThat(maxBucketValue, notNullValue());
assertThat(maxBucketValue.getName(), equalTo("max_bucket"));
assertThat(maxBucketValue.value(), equalTo(maxValue));
assertThat(maxBucketValue.keys(), equalTo(maxKeys.toArray(new String[maxKeys.size()])));
}
}