review comments

This commit is contained in:
Colin Goodheart-Smithe 2016-02-15 10:46:24 +00:00
parent 1a46628daa
commit bd2e00d396
5 changed files with 96 additions and 95 deletions

View File

@ -52,8 +52,8 @@ public class PercolateSourceBuilder extends ToXContentToBytes {
private List<SortBuilder> sorts;
private Boolean trackScores;
private HighlightBuilder highlightBuilder;
private List<AggregatorBuilder<?>> aggregationFactorys;
private List<PipelineAggregatorBuilder> pipelineAggregationFactorys;
private List<AggregatorBuilder<?>> aggregationBuilders;
private List<PipelineAggregatorBuilder> pipelineAggregationBuilders;
/**
* Sets the document to run the percolate queries against.
@ -126,10 +126,10 @@ public class PercolateSourceBuilder extends ToXContentToBytes {
* Add an aggregation definition.
*/
public PercolateSourceBuilder addAggregation(AggregatorBuilder<?> aggregationBuilder) {
if (aggregationFactorys == null) {
aggregationFactorys = new ArrayList<>();
if (aggregationBuilders == null) {
aggregationBuilders = new ArrayList<>();
}
aggregationFactorys.add(aggregationBuilder);
aggregationBuilders.add(aggregationBuilder);
return this;
}
@ -137,10 +137,10 @@ public class PercolateSourceBuilder extends ToXContentToBytes {
* Add an aggregation definition.
*/
public PercolateSourceBuilder addAggregation(PipelineAggregatorBuilder aggregationBuilder) {
if (pipelineAggregationFactorys == null) {
pipelineAggregationFactorys = new ArrayList<>();
if (pipelineAggregationBuilders == null) {
pipelineAggregationBuilders = new ArrayList<>();
}
pipelineAggregationFactorys.add(aggregationBuilder);
pipelineAggregationBuilders.add(aggregationBuilder);
return this;
}
@ -172,16 +172,16 @@ public class PercolateSourceBuilder extends ToXContentToBytes {
if (highlightBuilder != null) {
highlightBuilder.toXContent(builder, params);
}
if (aggregationFactorys != null || pipelineAggregationFactorys != null) {
if (aggregationBuilders != null || pipelineAggregationBuilders != null) {
builder.field("aggregations");
builder.startObject();
if (aggregationFactorys != null) {
for (AggregatorBuilder<?> aggregation : aggregationFactorys) {
if (aggregationBuilders != null) {
for (AggregatorBuilder<?> aggregation : aggregationBuilders) {
aggregation.toXContent(builder, params);
}
}
if (pipelineAggregationFactorys != null) {
for (PipelineAggregatorBuilder aggregation : pipelineAggregationFactorys) {
if (pipelineAggregationBuilders != null) {
for (PipelineAggregatorBuilder aggregation : pipelineAggregationBuilders) {
aggregation.toXContent(builder, params);
}
}

View File

@ -645,15 +645,15 @@ public abstract class StreamOutput extends OutputStream {
/**
* Writes a {@link AggregatorBuilder} to the current stream
*/
public void writeAggregatorFactory(AggregatorBuilder factory) throws IOException {
writeNamedWriteable(factory);
public void writeAggregatorBuilder(AggregatorBuilder<?> builder) throws IOException {
writeNamedWriteable(builder);
}
/**
* Writes a {@link PipelineAggregatorBuilder} to the current stream
*/
public void writePipelineAggregatorFactory(PipelineAggregatorBuilder factory) throws IOException {
writeNamedWriteable(factory);
public void writePipelineAggregatorBuilder(PipelineAggregatorBuilder<?> builder) throws IOException {
writeNamedWriteable(builder);
}
/**

View File

@ -145,7 +145,7 @@ public class PercolatorService extends AbstractComponent implements Releasable {
long finalCount = 0;
for (PercolateShardResponse shardResponse : shardResponses) {
finalCount += shardResponse.topDocs().totalHits;
}
}
InternalAggregations reducedAggregations = reduceAggregations(shardResponses);
return new PercolatorService.ReduceResult(finalCount, reducedAggregations);
@ -269,24 +269,26 @@ public class PercolatorService extends AbstractComponent implements Releasable {
} else {
int size = context.size();
if (size > context.searcher().getIndexReader().maxDoc()) {
// prevent easy OOM if more than the total number of docs that exist is requested...
// prevent easy OOM if more than the total number of docs that
// exist is requested...
size = context.searcher().getIndexReader().maxDoc();
}
}
TopScoreDocCollector collector = TopScoreDocCollector.create(size);
context.searcher().search(percolatorQuery, MultiCollector.wrap(collector, aggregatorCollector));
if (aggregatorCollector != null) {
aggregatorCollector.postCollection();
aggregationPhase.execute(context);
}
}
TopDocs topDocs = collector.topDocs();
Map<Integer, String> ids = new HashMap<>(topDocs.scoreDocs.length);
Map<Integer, Map<String, HighlightField>> hls = new HashMap<>(topDocs.scoreDocs.length);
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
if (context.trackScores() == false) {
// No sort or tracking scores was provided, so use special value to indicate to not show the scores:
// No sort or tracking scores was provided, so use special
// value to indicate to not show the scores:
scoreDoc.score = NO_SCORE;
}
}
int segmentIdx = ReaderUtil.subIndex(scoreDoc.doc, context.searcher().getIndexReader().leaves());
LeafReaderContext atomicReaderContext = context.searcher().getIndexReader().leaves().get(segmentIdx);
@ -301,11 +303,11 @@ public class PercolatorService extends AbstractComponent implements Releasable {
context.hitContext().cache().clear();
highlightPhase.hitExecute(context, context.hitContext());
hls.put(scoreDoc.doc, context.hitContext().hit().getHighlightFields());
}
}
return new PercolateShardResponse(topDocs, ids, hls, context);
}
}
return new PercolateShardResponse(topDocs, ids, hls, context);
}
}
@Override
public void close() {
@ -329,11 +331,12 @@ public class PercolatorService extends AbstractComponent implements Releasable {
return (InternalAggregation) p;
}).collect(Collectors.toList());
for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) {
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), new InternalAggregation.ReduceContext(bigArrays, scriptService));
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs),
new InternalAggregation.ReduceContext(bigArrays, scriptService));
newAggs.add(newAgg);
}
}
aggregations = new InternalAggregations(newAggs);
}
}
}
return aggregations;
}

View File

@ -128,7 +128,7 @@ public class AggregatorFactories {
private final Set<String> names = new HashSet<>();
private final List<AggregatorBuilder<?>> aggregatorBuilders = new ArrayList<>();
private final List<PipelineAggregatorBuilder<?>> pipelineAggregatorFactories = new ArrayList<>();
private final List<PipelineAggregatorBuilder<?>> pipelineAggregatorBuilders = new ArrayList<>();
private boolean skipResolveOrder;
public Builder addAggregators(AggregatorFactories factories) {
@ -144,7 +144,7 @@ public class AggregatorFactories {
}
public Builder addPipelineAggregator(PipelineAggregatorBuilder<?> pipelineAggregatorFactory) {
this.pipelineAggregatorFactories.add(pipelineAggregatorFactory);
this.pipelineAggregatorBuilders.add(pipelineAggregatorFactory);
return this;
}
@ -157,14 +157,14 @@ public class AggregatorFactories {
}
public AggregatorFactories build(AggregationContext context, AggregatorFactory<?> parent) throws IOException {
if (aggregatorBuilders.isEmpty() && pipelineAggregatorFactories.isEmpty()) {
if (aggregatorBuilders.isEmpty() && pipelineAggregatorBuilders.isEmpty()) {
return EMPTY;
}
List<PipelineAggregatorBuilder<?>> orderedpipelineAggregators = null;
if (skipResolveOrder) {
orderedpipelineAggregators = new ArrayList<>(pipelineAggregatorFactories);
orderedpipelineAggregators = new ArrayList<>(pipelineAggregatorBuilders);
} else {
orderedpipelineAggregators = resolvePipelineAggregatorOrder(this.pipelineAggregatorFactories, this.aggregatorBuilders);
orderedpipelineAggregators = resolvePipelineAggregatorOrder(this.pipelineAggregatorBuilders, this.aggregatorBuilders);
}
AggregatorFactory<?>[] aggFactories = new AggregatorFactory<?>[aggregatorBuilders.size()];
for (int i = 0; i < aggregatorBuilders.size(); i++) {
@ -174,42 +174,42 @@ public class AggregatorFactories {
}
private List<PipelineAggregatorBuilder<?>> resolvePipelineAggregatorOrder(
List<PipelineAggregatorBuilder<?>> pipelineAggregatorFactories, List<AggregatorBuilder<?>> aggFactories) {
Map<String, PipelineAggregatorBuilder<?>> pipelineAggregatorFactoriesMap = new HashMap<>();
for (PipelineAggregatorBuilder<?> factory : pipelineAggregatorFactories) {
pipelineAggregatorFactoriesMap.put(factory.getName(), factory);
List<PipelineAggregatorBuilder<?>> pipelineAggregatorBuilders, List<AggregatorBuilder<?>> aggBuilders) {
Map<String, PipelineAggregatorBuilder<?>> pipelineAggregatorBuildersMap = new HashMap<>();
for (PipelineAggregatorBuilder<?> builder : pipelineAggregatorBuilders) {
pipelineAggregatorBuildersMap.put(builder.getName(), builder);
}
Map<String, AggregatorBuilder<?>> aggFactoriesMap = new HashMap<>();
for (AggregatorBuilder<?> aggFactory : aggFactories) {
aggFactoriesMap.put(aggFactory.name, aggFactory);
Map<String, AggregatorBuilder<?>> aggBuildersMap = new HashMap<>();
for (AggregatorBuilder<?> aggBuilder : aggBuilders) {
aggBuildersMap.put(aggBuilder.name, aggBuilder);
}
List<PipelineAggregatorBuilder<?>> orderedPipelineAggregatorrs = new LinkedList<>();
List<PipelineAggregatorBuilder<?>> unmarkedFactories = new ArrayList<PipelineAggregatorBuilder<?>>(pipelineAggregatorFactories);
List<PipelineAggregatorBuilder<?>> unmarkedBuilders = new ArrayList<PipelineAggregatorBuilder<?>>(pipelineAggregatorBuilders);
Set<PipelineAggregatorBuilder<?>> temporarilyMarked = new HashSet<PipelineAggregatorBuilder<?>>();
while (!unmarkedFactories.isEmpty()) {
PipelineAggregatorBuilder<?> factory = unmarkedFactories.get(0);
resolvePipelineAggregatorOrder(aggFactoriesMap, pipelineAggregatorFactoriesMap, orderedPipelineAggregatorrs,
unmarkedFactories, temporarilyMarked, factory);
while (!unmarkedBuilders.isEmpty()) {
PipelineAggregatorBuilder<?> builder = unmarkedBuilders.get(0);
resolvePipelineAggregatorOrder(aggBuildersMap, pipelineAggregatorBuildersMap, orderedPipelineAggregatorrs, unmarkedBuilders,
temporarilyMarked, builder);
}
return orderedPipelineAggregatorrs;
}
private void resolvePipelineAggregatorOrder(Map<String, AggregatorBuilder<?>> aggFactoriesMap,
Map<String, PipelineAggregatorBuilder<?>> pipelineAggregatorFactoriesMap,
List<PipelineAggregatorBuilder<?>> orderedPipelineAggregators, List<PipelineAggregatorBuilder<?>> unmarkedFactories,
Set<PipelineAggregatorBuilder<?>> temporarilyMarked, PipelineAggregatorBuilder<?> factory) {
if (temporarilyMarked.contains(factory)) {
throw new IllegalArgumentException("Cyclical dependancy found with pipeline aggregator [" + factory.getName() + "]");
} else if (unmarkedFactories.contains(factory)) {
temporarilyMarked.add(factory);
String[] bucketsPaths = factory.getBucketsPaths();
private void resolvePipelineAggregatorOrder(Map<String, AggregatorBuilder<?>> aggBuildersMap,
Map<String, PipelineAggregatorBuilder<?>> pipelineAggregatorBuildersMap,
List<PipelineAggregatorBuilder<?>> orderedPipelineAggregators, List<PipelineAggregatorBuilder<?>> unmarkedBuilders,
Set<PipelineAggregatorBuilder<?>> temporarilyMarked, PipelineAggregatorBuilder<?> builder) {
if (temporarilyMarked.contains(builder)) {
throw new IllegalArgumentException("Cyclical dependancy found with pipeline aggregator [" + builder.getName() + "]");
} else if (unmarkedBuilders.contains(builder)) {
temporarilyMarked.add(builder);
String[] bucketsPaths = builder.getBucketsPaths();
for (String bucketsPath : bucketsPaths) {
List<AggregationPath.PathElement> bucketsPathElements = AggregationPath.parse(bucketsPath).getPathElements();
String firstAggName = bucketsPathElements.get(0).name;
if (bucketsPath.equals("_count") || bucketsPath.equals("_key")) {
continue;
} else if (aggFactoriesMap.containsKey(firstAggName)) {
AggregatorBuilder<?> aggFactory = aggFactoriesMap.get(firstAggName);
} else if (aggBuildersMap.containsKey(firstAggName)) {
AggregatorBuilder<?> aggBuilder = aggBuildersMap.get(firstAggName);
for (int i = 1; i < bucketsPathElements.size(); i++) {
PathElement pathElement = bucketsPathElements.get(i);
String aggName = pathElement.name;
@ -218,26 +218,26 @@ public class AggregatorFactories {
} else {
// Check the non-pipeline sub-aggregator
// factories
AggregatorBuilder<?>[] subFactories = aggFactory.factoriesBuilder.getAggregatorFactories();
boolean foundSubFactory = false;
for (AggregatorBuilder<?> subFactory : subFactories) {
if (aggName.equals(subFactory.name)) {
aggFactory = subFactory;
foundSubFactory = true;
AggregatorBuilder<?>[] subBuilders = aggBuilder.factoriesBuilder.getAggregatorFactories();
boolean foundSubBuilder = false;
for (AggregatorBuilder<?> subBuilder : subBuilders) {
if (aggName.equals(subBuilder.name)) {
aggBuilder = subBuilder;
foundSubBuilder = true;
break;
}
}
// Check the pipeline sub-aggregator factories
if (!foundSubFactory && (i == bucketsPathElements.size() - 1)) {
List<PipelineAggregatorBuilder<?>> subPipelineFactories = aggFactory.factoriesBuilder.pipelineAggregatorFactories;
for (PipelineAggregatorBuilder<?> subFactory : subPipelineFactories) {
if (!foundSubBuilder && (i == bucketsPathElements.size() - 1)) {
List<PipelineAggregatorBuilder<?>> subPipelineBuilders = aggBuilder.factoriesBuilder.pipelineAggregatorBuilders;
for (PipelineAggregatorBuilder<?> subFactory : subPipelineBuilders) {
if (aggName.equals(subFactory.name())) {
foundSubFactory = true;
foundSubBuilder = true;
break;
}
}
}
if (!foundSubFactory) {
if (!foundSubBuilder) {
throw new IllegalArgumentException("No aggregation [" + aggName + "] found for path [" + bucketsPath
+ "]");
}
@ -245,19 +245,18 @@ public class AggregatorFactories {
}
continue;
} else {
PipelineAggregatorBuilder<?> matchingFactory = pipelineAggregatorFactoriesMap.get(firstAggName);
if (matchingFactory != null) {
resolvePipelineAggregatorOrder(aggFactoriesMap, pipelineAggregatorFactoriesMap, orderedPipelineAggregators,
unmarkedFactories,
temporarilyMarked, matchingFactory);
PipelineAggregatorBuilder<?> matchingBuilder = pipelineAggregatorBuildersMap.get(firstAggName);
if (matchingBuilder != null) {
resolvePipelineAggregatorOrder(aggBuildersMap, pipelineAggregatorBuildersMap, orderedPipelineAggregators,
unmarkedBuilders, temporarilyMarked, matchingBuilder);
} else {
throw new IllegalArgumentException("No aggregation found for path [" + bucketsPath + "]");
}
}
}
unmarkedFactories.remove(factory);
temporarilyMarked.remove(factory);
orderedPipelineAggregators.add(factory);
unmarkedBuilders.remove(builder);
temporarilyMarked.remove(builder);
orderedPipelineAggregators.add(builder);
}
}
@ -266,11 +265,11 @@ public class AggregatorFactories {
}
List<PipelineAggregatorBuilder<?>> getPipelineAggregatorFactories() {
return this.pipelineAggregatorFactories;
return this.pipelineAggregatorBuilders;
}
public int count() {
return aggregatorBuilders.size() + pipelineAggregatorFactories.size();
return aggregatorBuilders.size() + pipelineAggregatorBuilders.size();
}
@Override
@ -293,11 +292,11 @@ public class AggregatorFactories {
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(this.aggregatorBuilders.size());
for (AggregatorBuilder<?> factory : aggregatorBuilders) {
out.writeAggregatorFactory(factory);
out.writeAggregatorBuilder(factory);
}
out.writeVInt(this.pipelineAggregatorFactories.size());
for (PipelineAggregatorBuilder<?> factory : pipelineAggregatorFactories) {
out.writePipelineAggregatorFactory(factory);
out.writeVInt(this.pipelineAggregatorBuilders.size());
for (PipelineAggregatorBuilder<?> factory : pipelineAggregatorBuilders) {
out.writePipelineAggregatorBuilder(factory);
}
}
@ -309,8 +308,8 @@ public class AggregatorFactories {
subAgg.toXContent(builder, params);
}
}
if (pipelineAggregatorFactories != null) {
for (PipelineAggregatorBuilder<?> subAgg : pipelineAggregatorFactories) {
if (pipelineAggregatorBuilders != null) {
for (PipelineAggregatorBuilder<?> subAgg : pipelineAggregatorBuilders) {
subAgg.toXContent(builder, params);
}
}
@ -320,7 +319,7 @@ public class AggregatorFactories {
@Override
public int hashCode() {
return Objects.hash(aggregatorBuilders, pipelineAggregatorFactories);
return Objects.hash(aggregatorBuilders, pipelineAggregatorBuilders);
}
@Override
@ -332,7 +331,7 @@ public class AggregatorFactories {
Builder other = (Builder) obj;
if (!Objects.equals(aggregatorBuilders, other.aggregatorBuilders))
return false;
if (!Objects.equals(pipelineAggregatorFactories, other.pipelineAggregatorFactories))
if (!Objects.equals(pipelineAggregatorBuilders, other.pipelineAggregatorBuilders))
return false;
return true;
}

View File

@ -81,7 +81,7 @@ public class GeoHashGridParser extends GeoPointValuesSourceParser {
Integer precision = (Integer) otherOptions.get(GeoHashGridParams.FIELD_PRECISION);
if (precision != null) {
factory.precision(precision);
}
}
Integer size = (Integer) otherOptions.get(GeoHashGridParams.FIELD_SIZE);
if (size != null) {
factory.size(size);
@ -106,10 +106,10 @@ public class GeoHashGridParser extends GeoPointValuesSourceParser {
} else if (parseFieldMatcher.match(currentFieldName, GeoHashGridParams.FIELD_SHARD_SIZE)) {
otherOptions.put(GeoHashGridParams.FIELD_SHARD_SIZE, parser.intValue());
return true;
}
}
}
return false;
}
}
public static class GeoGridAggregatorBuilder extends ValuesSourceAggregatorBuilder<ValuesSource.GeoPoint, GeoGridAggregatorBuilder> {
@ -121,7 +121,7 @@ public class GeoHashGridParser extends GeoPointValuesSourceParser {
public GeoGridAggregatorBuilder(String name) {
super(name, InternalGeoHashGrid.TYPE, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT);
}
}
public GeoGridAggregatorBuilder precision(int precision) {
this.precision = GeoHashGridParams.checkPrecision(precision);
@ -185,9 +185,8 @@ public class GeoHashGridParser extends GeoPointValuesSourceParser {
}
@Override
protected GeoGridAggregatorBuilder innerReadFrom(
String name, ValuesSourceType valuesSourceType,
ValueType targetValueType, StreamInput in) throws IOException {
protected GeoGridAggregatorBuilder innerReadFrom(String name, ValuesSourceType valuesSourceType, ValueType targetValueType,
StreamInput in) throws IOException {
GeoGridAggregatorBuilder factory = new GeoGridAggregatorBuilder(name);
factory.precision = in.readVInt();
factory.requiredSize = in.readVInt();
@ -200,7 +199,7 @@ public class GeoHashGridParser extends GeoPointValuesSourceParser {
out.writeVInt(precision);
out.writeVInt(requiredSize);
out.writeVInt(shardSize);
}
}
@Override
protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
@ -221,7 +220,7 @@ public class GeoHashGridParser extends GeoPointValuesSourceParser {
}
if (shardSize != other.shardSize) {
return false;
}
}
return true;
}