Aggregations: removed aggregations from ReduceContext

ReduceContext contains the list of aggregations to reduce but these aggregations are set as null half of the time. This change makes the reduce(ReduceContext) method changed to reduce(List<InternalAggregation>, ReduceContext) and ReduceContext now only holds the BigArrays and Script services.
This commit is contained in:
Colin Goodheart-Smithe 2015-04-09 14:28:37 +01:00
parent 3b41299273
commit fcc09f62b9
25 changed files with 88 additions and 70 deletions

View File

@ -19,11 +19,19 @@
package org.elasticsearch.percolator;
import com.carrotsearch.hppc.ByteObjectOpenHashMap;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.index.memory.ExtendedMemoryIndex;
import org.apache.lucene.index.memory.MemoryIndex;
import org.apache.lucene.search.*;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.FilteredQuery;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.ElasticsearchException;
@ -58,14 +66,21 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.search.nested.NonNestedDocsFilter;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.percolator.QueryCollector.*;
import org.elasticsearch.percolator.QueryCollector.Count;
import org.elasticsearch.percolator.QueryCollector.Match;
import org.elasticsearch.percolator.QueryCollector.MatchAndScore;
import org.elasticsearch.percolator.QueryCollector.MatchAndSort;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.SearchShardTarget;
@ -83,7 +98,9 @@ import java.util.List;
import java.util.Map;
import static org.elasticsearch.index.mapper.SourceToParse.source;
import static org.elasticsearch.percolator.QueryCollector.*;
import static org.elasticsearch.percolator.QueryCollector.count;
import static org.elasticsearch.percolator.QueryCollector.match;
import static org.elasticsearch.percolator.QueryCollector.matchAndScore;
public class PercolatorService extends AbstractComponent {
@ -834,7 +851,7 @@ public class PercolatorService extends AbstractComponent {
for (PercolateShardResponse shardResult : shardResults) {
aggregationsList.add(shardResult.aggregations());
}
return InternalAggregations.reduce(aggregationsList, new ReduceContext(null, bigArrays, scriptService));
return InternalAggregations.reduce(aggregationsList, new ReduceContext(bigArrays, scriptService));
}
}

View File

@ -18,7 +18,6 @@
*/
package org.elasticsearch.search.aggregations;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
@ -89,20 +88,14 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
public static class ReduceContext {
private final List<InternalAggregation> aggregations;
private final BigArrays bigArrays;
private ScriptService scriptService;
public ReduceContext(List<InternalAggregation> aggregations, BigArrays bigArrays, ScriptService scriptService) {
this.aggregations = aggregations;
public ReduceContext(BigArrays bigArrays, ScriptService scriptService) {
this.bigArrays = bigArrays;
this.scriptService = scriptService;
}
public List<InternalAggregation> aggregations() {
return aggregations;
}
public BigArrays bigArrays() {
return bigArrays;
}
@ -146,7 +139,7 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
* try reusing an existing get instance (typically the first in the given list) to save on redundant object
* construction.
*/
public abstract InternalAggregation reduce(ReduceContext reduceContext);
public abstract InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);
@Override
public Object getProperty(String path) {

View File

@ -169,7 +169,7 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
for (Map.Entry<String, List<InternalAggregation>> entry : aggByName.entrySet()) {
List<InternalAggregation> aggregations = entry.getValue();
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
reducedAggregations.add(first.reduce(new InternalAggregation.ReduceContext(aggregations, context.bigArrays(), context.scriptService())));
reducedAggregations.add(first.reduce(aggregations, context));
}
return new InternalAggregations(reducedAggregations);
}

View File

@ -69,8 +69,7 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
protected abstract InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations);
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long docCount = 0L;
List<InternalAggregations> subAggregationsList = new ArrayList<>(aggregations.size());
for (InternalAggregation aggregation : aggregations) {

View File

@ -191,8 +191,7 @@ public class InternalFilters extends InternalMultiBucketAggregation implements F
}
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<List<Bucket>> bucketsList = null;
for (InternalAggregation aggregation : aggregations) {
InternalFilters filters = (InternalFilters) aggregation;

View File

@ -188,8 +188,7 @@ public class InternalGeoHashGrid extends InternalMultiBucketAggregation implemen
}
@Override
public InternalGeoHashGrid reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
public InternalGeoHashGrid reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
LongObjectPagedHashMap<List<Bucket>> buckets = null;
for (InternalAggregation aggregation : aggregations) {

View File

@ -297,8 +297,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
}
private List<B> reduceBuckets(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
private List<B> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
final PriorityQueue<IteratorAndCurrent<B>> pq = new PriorityQueue<IteratorAndCurrent<B>>(aggregations.size()) {
@Override
@ -412,8 +411,8 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
}
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<B> reducedBuckets = reduceBuckets(reduceContext);
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<B> reducedBuckets = reduceBuckets(aggregations, reduceContext);
// adding empty buckets if needed
if (minDocCount == 0) {

View File

@ -260,8 +260,7 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalMulti
}
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
@SuppressWarnings("unchecked")
List<Bucket>[] rangeList = new List[ranges.size()];
for (int i = 0; i < rangeList.length; ++i) {

View File

@ -156,8 +156,7 @@ public abstract class InternalSignificantTerms extends InternalMultiBucketAggreg
}
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long globalSubsetSize = 0;
long globalSupersetSize = 0;

View File

@ -26,7 +26,6 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.JLHScore;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -68,10 +67,10 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms {
}
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
for (InternalAggregation aggregation : reduceContext.aggregations()) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
for (InternalAggregation aggregation : aggregations) {
if (!(aggregation instanceof UnmappedSignificantTerms)) {
return aggregation.reduce(reduceContext);
return aggregation.reduce(aggregations, reduceContext);
}
}
return this;

View File

@ -162,8 +162,7 @@ public abstract class InternalTerms extends InternalMultiBucketAggregation imple
}
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
Multimap<Object, InternalTerms.Bucket> buckets = ArrayListMultimap.create();
long sumDocCountError = 0;

View File

@ -81,10 +81,10 @@ public class UnmappedTerms extends InternalTerms {
}
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
for (InternalAggregation agg : reduceContext.aggregations()) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
for (InternalAggregation agg : aggregations) {
if (!(agg instanceof UnmappedTerms)) {
return agg.reduce(reduceContext);
return agg.reduce(aggregations, reduceContext);
}
}
return this;

View File

@ -29,6 +29,7 @@ import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
@ -79,10 +80,10 @@ public class InternalAvg extends InternalNumericMetricsAggregation.SingleValue i
}
@Override
public InternalAvg reduce(ReduceContext reduceContext) {
public InternalAvg reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long count = 0;
double sum = 0;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
for (InternalAggregation aggregation : aggregations) {
count += ((InternalAvg) aggregation).count;
sum += ((InternalAvg) aggregation).sum;
}

View File

@ -99,8 +99,7 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation
}
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
InternalCardinality reduced = null;
for (InternalAggregation aggregation : aggregations) {
final InternalCardinality cardinality = (InternalCardinality) aggregation;

View File

@ -73,7 +73,7 @@ public class InternalGeoBounds extends InternalMetricsAggregation implements Geo
}
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
double top = Double.NEGATIVE_INFINITY;
double bottom = Double.POSITIVE_INFINITY;
double posLeft = Double.POSITIVE_INFINITY;
@ -81,7 +81,7 @@ public class InternalGeoBounds extends InternalMetricsAggregation implements Geo
double negLeft = Double.POSITIVE_INFINITY;
double negRight = Double.NEGATIVE_INFINITY;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
for (InternalAggregation aggregation : aggregations) {
InternalGeoBounds bounds = (InternalGeoBounds) aggregation;
if (bounds.top > top) {

View File

@ -29,6 +29,7 @@ import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
@ -77,9 +78,9 @@ public class InternalMax extends InternalNumericMetricsAggregation.SingleValue i
}
@Override
public InternalMax reduce(ReduceContext reduceContext) {
public InternalMax reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
double max = Double.NEGATIVE_INFINITY;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
for (InternalAggregation aggregation : aggregations) {
max = Math.max(max, ((InternalMax) aggregation).max);
}
return new InternalMax(name, max, valueFormatter, getMetaData());

View File

@ -29,6 +29,7 @@ import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
@ -78,9 +79,9 @@ public class InternalMin extends InternalNumericMetricsAggregation.SingleValue i
}
@Override
public InternalMin reduce(ReduceContext reduceContext) {
public InternalMin reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
double min = Double.POSITIVE_INFINITY;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
for (InternalAggregation aggregation : aggregations) {
min = Math.min(min, ((InternalMin) aggregation).min);
}
return new InternalMin(getName(), min, this.valueFormatter, getMetaData());

View File

@ -60,8 +60,7 @@ abstract class AbstractInternalPercentiles extends InternalNumericMetricsAggrega
public abstract double value(double key);
@Override
public AbstractInternalPercentiles reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
public AbstractInternalPercentiles reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
TDigestState merged = null;
for (InternalAggregation aggregation : aggregations) {
final AbstractInternalPercentiles percentiles = (AbstractInternalPercentiles) aggregation;

View File

@ -82,13 +82,13 @@ public class InternalScriptedMetric extends InternalMetricsAggregation implement
}
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<Object> aggregationObjects = new ArrayList<>();
for (InternalAggregation aggregation : reduceContext.aggregations()) {
for (InternalAggregation aggregation : aggregations) {
InternalScriptedMetric mapReduceAggregation = (InternalScriptedMetric) aggregation;
aggregationObjects.add(mapReduceAggregation.aggregation());
}
InternalScriptedMetric firstAggregation = ((InternalScriptedMetric) reduceContext.aggregations().get(0));
InternalScriptedMetric firstAggregation = ((InternalScriptedMetric) aggregations.get(0));
Object aggregation;
if (firstAggregation.reduceScript != null) {
Map<String, Object> params;

View File

@ -30,6 +30,7 @@ import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
@ -148,12 +149,12 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue
}
@Override
public InternalStats reduce(ReduceContext reduceContext) {
public InternalStats reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
double sum = 0;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
for (InternalAggregation aggregation : aggregations) {
InternalStats stats = (InternalStats) aggregation;
count += stats.getCount();
min = Math.min(min, stats.getMin());

View File

@ -30,6 +30,7 @@ import org.elasticsearch.search.aggregations.metrics.stats.InternalStats;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
@ -143,13 +144,13 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat
}
@Override
public InternalExtendedStats reduce(ReduceContext reduceContext) {
public InternalExtendedStats reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
double sumOfSqrs = 0;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
for (InternalAggregation aggregation : aggregations) {
InternalExtendedStats stats = (InternalExtendedStats) aggregation;
sumOfSqrs += stats.getSumOfSquares();
}
final InternalStats stats = super.reduce(reduceContext);
final InternalStats stats = super.reduce(aggregations, reduceContext);
return new InternalExtendedStats(name, stats.getCount(), stats.getSum(), stats.getMin(), stats.getMax(), sumOfSqrs, sigma, valueFormatter, getMetaData());
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
@ -77,9 +78,9 @@ public class InternalSum extends InternalNumericMetricsAggregation.SingleValue i
}
@Override
public InternalSum reduce(ReduceContext reduceContext) {
public InternalSum reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
double sum = 0;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
for (InternalAggregation aggregation : aggregations) {
sum += ((InternalSum) aggregation).sum;
}
return new InternalSum(name, sum, valueFormatter, getMetaData());

View File

@ -18,9 +18,6 @@
*/
package org.elasticsearch.search.aggregations.metrics.tophits;
import java.io.IOException;
import java.util.List;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
@ -38,6 +35,9 @@ import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits;
import java.io.IOException;
import java.util.List;
/**
*/
public class InternalTopHits extends InternalMetricsAggregation implements TopHits {
@ -85,8 +85,7 @@ public class InternalTopHits extends InternalMetricsAggregation implements TopHi
}
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
InternalSearchHits[] shardHits = new InternalSearchHits[aggregations.size()];
final TopDocs reducedTopDocs;

View File

@ -28,6 +28,7 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
@ -76,9 +77,9 @@ public class InternalValueCount extends InternalNumericMetricsAggregation.Single
}
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long valueCount = 0;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
for (InternalAggregation aggregation : aggregations) {
valueCount += ((InternalValueCount) aggregation).value;
}
return new InternalValueCount(name, valueCount, valueFormatter, getMetaData());

View File

@ -23,7 +23,14 @@ import com.carrotsearch.hppc.IntArrayList;
import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.*;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermStatistics;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.collect.HppcMaps;
import org.elasticsearch.common.component.AbstractComponent;
@ -47,7 +54,12 @@ import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.search.suggest.Suggest;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*
@ -387,7 +399,7 @@ public class SearchPhaseController extends AbstractComponent {
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations());
}
aggregations = InternalAggregations.reduce(aggregationsList, new ReduceContext(null, bigArrays, scriptService));
aggregations = InternalAggregations.reduce(aggregationsList, new ReduceContext(bigArrays, scriptService));
}
}