From 61aba891107fe77877e43a8e3379c498a57b429d Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 26 Nov 2013 23:02:26 +0100 Subject: [PATCH] Added aggregation support to the percolate api. Closes #4245 --- .../percolate/PercolateRequestBuilder.java | 9 ++ .../action/percolate/PercolateResponse.java | 15 +++- .../percolate/PercolateShardResponse.java | 63 ++++++------- .../percolate/PercolateSourceBuilder.java | 22 +++++ .../percolate/TransportPercolateAction.java | 2 +- .../percolator/PercolateContext.java | 6 +- .../percolator/PercolatorService.java | 90 +++++++++++-------- .../percolator/QueryCollector.java | 60 ++++++++++--- .../search/aggregations/AggregationPhase.java | 4 +- .../aggregations/InternalAggregations.java | 4 + ...PercolatorFacetsAndAggregationsTests.java} | 42 +++++++-- 11 files changed, 218 insertions(+), 99 deletions(-) rename src/test/java/org/elasticsearch/percolator/{PercolatorFacetsTests.java => PercolatorFacetsAndAggregationsTests.java} (65%) diff --git a/src/main/java/org/elasticsearch/action/percolate/PercolateRequestBuilder.java b/src/main/java/org/elasticsearch/action/percolate/PercolateRequestBuilder.java index 078be2e1ddb..396db7b0fe8 100644 --- a/src/main/java/org/elasticsearch/action/percolate/PercolateRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/percolate/PercolateRequestBuilder.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.FilterBuilder; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.facet.FacetBuilder; import org.elasticsearch.search.highlight.HighlightBuilder; @@ -164,6 +165,14 @@ public class PercolateRequestBuilder extends BroadcastOperationRequestBuilder shardFailures, - Match[] matches, long count, long tookInMillis, InternalFacets facets) { + Match[] matches, long count, long tookInMillis, InternalFacets facets, InternalAggregations aggregations) { super(totalShards, successfulShards, failedShards, shardFailures); this.tookInMillis = tookInMillis; this.matches = matches; this.count = count; this.facets = facets; + this.aggregations = aggregations; } public PercolateResponse(int totalShards, int successfulShards, int failedShards, List shardFailures, long tookInMillis) { @@ -97,6 +100,10 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite return facets; } + public InternalAggregations getAggregations() { + return aggregations; + } + @Override public Iterator iterator() { return Arrays.asList(matches).iterator(); @@ -151,6 +158,10 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite facets.toXContent(builder, params); } + if (aggregations != null) { + aggregations.toXContent(builder, params); + } + builder.endObject(); return builder; } @@ -167,6 +178,7 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite matches[i].readFrom(in); } facets = InternalFacets.readOptionalFacets(in); + aggregations = InternalAggregations.readOptionalAggregations(in); } @Override @@ -179,6 +191,7 @@ public class PercolateResponse extends BroadcastOperationResponse implements Ite match.writeTo(out); } out.writeOptionalStreamable(facets); + out.writeOptionalStreamable(aggregations); } public static class Match implements Streamable { diff --git a/src/main/java/org/elasticsearch/action/percolate/PercolateShardResponse.java b/src/main/java/org/elasticsearch/action/percolate/PercolateShardResponse.java index d7f79376608..f7a965bb4e8 100644 --- a/src/main/java/org/elasticsearch/action/percolate/PercolateShardResponse.java +++ b/src/main/java/org/elasticsearch/action/percolate/PercolateShardResponse.java @@ -19,13 +19,16 @@ package org.elasticsearch.action.percolate; +import com.google.common.collect.ImmutableList; import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.percolator.PercolateContext; +import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.facet.InternalFacets; import org.elasticsearch.search.highlight.HighlightField; +import org.elasticsearch.search.query.QuerySearchResult; import java.io.IOException; import java.util.ArrayList; @@ -37,18 +40,22 @@ import java.util.Map; */ public class PercolateShardResponse extends BroadcastShardOperationResponse { - private static final BytesRef[] EMPTY = new BytesRef[0]; + private static final BytesRef[] EMPTY_MATCHES = new BytesRef[0]; + private static final float[] EMPTY_SCORES = new float[0]; + private static final List> EMPTY_HL = ImmutableList.of(); private long count; private float[] scores; private BytesRef[] matches; - private List> hls = new ArrayList>(); + private List> hls; private byte percolatorTypeId; private int requestedSize; private InternalFacets facets; + private InternalAggregations aggregations; PercolateShardResponse() { + hls = new ArrayList>(); } public PercolateShardResponse(BytesRef[] matches, List> hls, long count, float[] scores, PercolateContext context, String index, int shardId) { @@ -59,45 +66,31 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse { this.scores = scores; this.percolatorTypeId = context.percolatorTypeId; this.requestedSize = context.size; - buildFacets(context); + QuerySearchResult result = context.queryResult(); + if (result != null) { + if (result.facets() != null) { + this.facets = new InternalFacets(result.facets().facets()); + } + if (result.aggregations() != null) { + this.aggregations = (InternalAggregations) result.aggregations(); + } + } } public PercolateShardResponse(BytesRef[] matches, long count, float[] scores, PercolateContext context, String index, int shardId) { - super(index, shardId); - this.matches = matches; - this.count = count; - this.scores = scores; - this.percolatorTypeId = context.percolatorTypeId; - this.requestedSize = context.size; - buildFacets(context); + this(matches, EMPTY_HL, count, scores, context, index, shardId); } public PercolateShardResponse(BytesRef[] matches, List> hls, long count, PercolateContext context, String index, int shardId) { - super(index, shardId); - this.matches = matches; - this.hls = hls; - this.scores = new float[0]; - this.count = count; - this.percolatorTypeId = context.percolatorTypeId; - this.requestedSize = context.size; - buildFacets(context); + this(matches, hls, count, EMPTY_SCORES, context, index, shardId); } public PercolateShardResponse(long count, PercolateContext context, String index, int shardId) { - super(index, shardId); - this.count = count; - this.matches = EMPTY; - this.scores = new float[0]; - this.percolatorTypeId = context.percolatorTypeId; - this.requestedSize = context.size; - buildFacets(context); + this(EMPTY_MATCHES, EMPTY_HL, count, EMPTY_SCORES, context, index, shardId); } public PercolateShardResponse(PercolateContext context, String index, int shardId) { - super(index, shardId); - this.matches = EMPTY; - this.scores = new float[0]; - this.requestedSize = context.size; + this(EMPTY_MATCHES, EMPTY_HL, 0, EMPTY_SCORES, context, index, shardId); } public BytesRef[] matches() { @@ -124,6 +117,10 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse { return facets; } + public InternalAggregations aggregations() { + return aggregations; + } + public byte percolatorTypeId() { return percolatorTypeId; } @@ -156,6 +153,7 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse { hls.add(fields); } facets = InternalFacets.readOptionalFacets(in); + aggregations = InternalAggregations.readOptionalAggregations(in); } @Override @@ -181,11 +179,6 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse { } } out.writeOptionalStreamable(facets); - } - - private void buildFacets(PercolateContext context) { - if (context.queryResult() != null && context.queryResult().facets() != null) { - this.facets = new InternalFacets(context.queryResult().facets().facets()); - } + out.writeOptionalStreamable(aggregations); } } diff --git a/src/main/java/org/elasticsearch/action/percolate/PercolateSourceBuilder.java b/src/main/java/org/elasticsearch/action/percolate/PercolateSourceBuilder.java index 278ca9f23a8..01d5ea60d21 100644 --- a/src/main/java/org/elasticsearch/action/percolate/PercolateSourceBuilder.java +++ b/src/main/java/org/elasticsearch/action/percolate/PercolateSourceBuilder.java @@ -26,6 +26,8 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.*; import org.elasticsearch.index.query.FilterBuilder; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilderException; import org.elasticsearch.search.facet.FacetBuilder; import org.elasticsearch.search.highlight.HighlightBuilder; @@ -48,6 +50,7 @@ public class PercolateSourceBuilder implements ToXContent { private Boolean score; private HighlightBuilder highlightBuilder; private List facets; + private List aggregations; public DocBuilder percolateDocument() { if (docBuilder == null) { @@ -137,6 +140,17 @@ public class PercolateSourceBuilder implements ToXContent { return this; } + /** + * Add an aggregationB definition. + */ + public PercolateSourceBuilder addAggregation(AggregationBuilder aggregationBuilder) { + if (aggregations == null) { + aggregations = Lists.newArrayList(); + } + aggregations.add(aggregationBuilder); + return this; + } + public BytesReference buildAsBytes(XContentType contentType) throws SearchSourceBuilderException { try { XContentBuilder builder = XContentFactory.contentBuilder(contentType); @@ -181,6 +195,14 @@ public class PercolateSourceBuilder implements ToXContent { } builder.endObject(); } + if (aggregations != null) { + builder.field("aggregations"); + builder.startObject(); + for (AbstractAggregationBuilder aggregation : aggregations) { + aggregation.toXContent(builder, params); + } + builder.endObject(); + } builder.endObject(); return builder; } diff --git a/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java b/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java index cc92c5fa921..8c6100d55d6 100644 --- a/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java +++ b/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java @@ -162,7 +162,7 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction< long tookInMillis = System.currentTimeMillis() - request.startTime; return new PercolateResponse( shardsResponses.length(), successfulShards, failedShards, shardFailures, - result.matches(), result.count(), tookInMillis, result.reducedFacets() + result.matches(), result.count(), tookInMillis, result.reducedFacets(), result.reducedAggregations() ); } } diff --git a/src/main/java/org/elasticsearch/percolator/PercolateContext.java b/src/main/java/org/elasticsearch/percolator/PercolateContext.java index 4c29cfa7fe4..496d938d3ab 100644 --- a/src/main/java/org/elasticsearch/percolator/PercolateContext.java +++ b/src/main/java/org/elasticsearch/percolator/PercolateContext.java @@ -108,6 +108,7 @@ public class PercolateContext extends SearchContext { private Query percolateQuery; private FetchSubPhase.HitContext hitContext; private SearchContextFacets facets; + private SearchContextAggregations aggregations; private QuerySearchResult querySearchResult; public PercolateContext(PercolateShardRequest request, SearchShardTarget searchShardTarget, IndexShard indexShard, IndexService indexService, CacheRecycler cacheRecycler) { @@ -286,12 +287,13 @@ public class PercolateContext extends SearchContext { @Override public SearchContextAggregations aggregations() { - throw new UnsupportedOperationException(); + return aggregations; } @Override public SearchContext aggregations(SearchContextAggregations aggregations) { - throw new UnsupportedOperationException(); + this.aggregations = aggregations; + return this; } @Override diff --git a/src/main/java/org/elasticsearch/percolator/PercolatorService.java b/src/main/java/org/elasticsearch/percolator/PercolatorService.java index 0ea0d6cc3e5..4c98087482e 100644 --- a/src/main/java/org/elasticsearch/percolator/PercolatorService.java +++ b/src/main/java/org/elasticsearch/percolator/PercolatorService.java @@ -77,6 +77,8 @@ import org.elasticsearch.percolator.QueryCollector.MatchAndScore; import org.elasticsearch.percolator.QueryCollector.MatchAndSort; import org.elasticsearch.search.SearchParseElement; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.aggregations.AggregationPhase; +import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.facet.Facet; import org.elasticsearch.search.facet.FacetPhase; import org.elasticsearch.search.facet.InternalFacet; @@ -98,7 +100,7 @@ import static org.elasticsearch.percolator.QueryCollector.*; public class PercolatorService extends AbstractComponent { public final static float NO_SCORE = Float.NEGATIVE_INFINITY; - public static final String TYPE_NAME = ".percolator"; + public final static String TYPE_NAME = ".percolator"; private final CloseableThreadLocal cache; private final IndicesService indicesService; @@ -108,15 +110,19 @@ public class PercolatorService extends AbstractComponent { private final FacetPhase facetPhase; private final HighlightPhase highlightPhase; + private final AggregationPhase aggregationPhase; @Inject - public PercolatorService(Settings settings, IndicesService indicesService, CacheRecycler cacheRecycler, HighlightPhase highlightPhase, ClusterService clusterService, FacetPhase facetPhase) { + public PercolatorService(Settings settings, IndicesService indicesService, CacheRecycler cacheRecycler, + HighlightPhase highlightPhase, ClusterService clusterService, FacetPhase facetPhase, + AggregationPhase aggregationPhase) { super(settings); this.indicesService = indicesService; this.cacheRecycler = cacheRecycler; this.clusterService = clusterService; this.highlightPhase = highlightPhase; this.facetPhase = facetPhase; + this.aggregationPhase = aggregationPhase; final long maxReuseBytes = settings.getAsBytesSize("indices.memory.memory_index.size_per_thread", new ByteSizeValue(1, ByteSizeUnit.MB)).bytes(); cache = new CloseableThreadLocal() { @@ -166,7 +172,7 @@ public class PercolatorService extends AbstractComponent { throw new ElasticSearchIllegalArgumentException("Nothing to percolate"); } - if (context.percolateQuery() == null && (context.score || context.sort || context.facets() != null)) { + if (context.percolateQuery() == null && (context.score || context.sort || context.facets() != null || context.aggregations() != null)) { context.percolateQuery(new MatchAllDocsQuery()); } @@ -228,8 +234,10 @@ public class PercolatorService extends AbstractComponent { return null; } + // TODO: combine all feature parse elements into one map Map hlElements = highlightPhase.parseElements(); Map facetElements = facetPhase.parseElements(); + Map aggregationElements = aggregationPhase.parseElements(); ParsedDocument doc = null; XContentParser parser = null; @@ -262,6 +270,9 @@ public class PercolatorService extends AbstractComponent { SearchParseElement element = hlElements.get(currentFieldName); if (element == null) { element = facetElements.get(currentFieldName); + if (element == null) { + element = aggregationElements.get(currentFieldName); + } } if ("query".equals(currentFieldName)) { @@ -389,12 +400,9 @@ public class PercolatorService extends AbstractComponent { } assert !shardResults.isEmpty(); - if (shardResults.get(0).facets() != null) { - InternalFacets reducedFacets = reduceFacets(shardResults); - return new ReduceResult(finalCount, reducedFacets); - } else { - return new ReduceResult(finalCount); - } + InternalFacets reducedFacets = reduceFacets(shardResults); + InternalAggregations reducedAggregations = reduceAggregations(shardResults); + return new ReduceResult(finalCount, reducedFacets, reducedAggregations); } @Override @@ -482,12 +490,9 @@ public class PercolatorService extends AbstractComponent { } assert !shardResults.isEmpty(); - if (shardResults.get(0).facets() != null) { - InternalFacets reducedFacets = reduceFacets(shardResults); - return new ReduceResult(foundMatches, finalMatches.toArray(new PercolateResponse.Match[finalMatches.size()]), reducedFacets); - } else { - return new ReduceResult(foundMatches, finalMatches.toArray(new PercolateResponse.Match[finalMatches.size()])); - } + InternalFacets reducedFacets = reduceFacets(shardResults); + InternalAggregations reducedAggregations = reduceAggregations(shardResults); + return new ReduceResult(foundMatches, finalMatches.toArray(new PercolateResponse.Match[finalMatches.size()]), reducedFacets, reducedAggregations); } @Override @@ -680,12 +685,9 @@ public class PercolatorService extends AbstractComponent { } assert !shardResults.isEmpty(); - if (shardResults.get(0).facets() != null) { - InternalFacets reducedFacets = reduceFacets(shardResults); - return new ReduceResult(foundMatches, finalMatches.toArray(new PercolateResponse.Match[finalMatches.size()]), reducedFacets); - } else { - return new ReduceResult(foundMatches, finalMatches.toArray(new PercolateResponse.Match[finalMatches.size()])); - } + InternalFacets reducedFacets = reduceFacets(shardResults); + InternalAggregations reducedAggregations = reduceAggregations(shardResults); + return new ReduceResult(foundMatches, finalMatches.toArray(new PercolateResponse.Match[finalMatches.size()]), reducedFacets, reducedAggregations); } @Override @@ -755,6 +757,9 @@ public class PercolatorService extends AbstractComponent { if (context.facets() != null) { facetPhase.execute(context); } + if (context.aggregations() != null) { + aggregationPhase.execute(context); + } } public final static class ReduceResult { @@ -764,29 +769,20 @@ public class PercolatorService extends AbstractComponent { private final long count; private final PercolateResponse.Match[] matches; private final InternalFacets reducedFacets; + private final InternalAggregations reducedAggregations; - ReduceResult(long count, PercolateResponse.Match[] matches, InternalFacets reducedFacets) { + ReduceResult(long count, PercolateResponse.Match[] matches, InternalFacets reducedFacets, InternalAggregations reducedAggregations) { this.count = count; this.matches = matches; this.reducedFacets = reducedFacets; + this.reducedAggregations = reducedAggregations; } - ReduceResult(long count, PercolateResponse.Match[] matches) { - this.count = count; - this.matches = matches; - this.reducedFacets = null; - } - - public ReduceResult(long count, InternalFacets reducedFacets) { + public ReduceResult(long count, InternalFacets reducedFacets, InternalAggregations reducedAggregations) { this.count = count; this.matches = EMPTY; this.reducedFacets = reducedFacets; - } - - public ReduceResult(long count) { - this.count = count; - this.matches = EMPTY; - this.reducedFacets = null; + this.reducedAggregations = reducedAggregations; } public long count() { @@ -800,9 +796,17 @@ public class PercolatorService extends AbstractComponent { public InternalFacets reducedFacets() { return reducedFacets; } + + public InternalAggregations reducedAggregations() { + return reducedAggregations; + } } private InternalFacets reduceFacets(List shardResults) { + if (shardResults.get(0).facets() == null) { + return null; + } + if (shardResults.size() == 1) { return shardResults.get(0).facets(); } @@ -828,4 +832,20 @@ public class PercolatorService extends AbstractComponent { return new InternalFacets(aggregatedFacets); } + private InternalAggregations reduceAggregations(List shardResults) { + if (shardResults.get(0).aggregations() == null) { + return null; + } + + if (shardResults.size() == 1) { + return shardResults.get(0).aggregations(); + } + + List aggregationsList = new ArrayList(shardResults.size()); + for (PercolateShardResponse shardResult : shardResults) { + aggregationsList.add(shardResult.aggregations()); + } + return InternalAggregations.reduce(aggregationsList, cacheRecycler); + } + } diff --git a/src/main/java/org/elasticsearch/percolator/QueryCollector.java b/src/main/java/org/elasticsearch/percolator/QueryCollector.java index 97e5e15f182..b32036c6f6e 100644 --- a/src/main/java/org/elasticsearch/percolator/QueryCollector.java +++ b/src/main/java/org/elasticsearch/percolator/QueryCollector.java @@ -33,6 +33,10 @@ import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.internal.IdFieldMapper; import org.elasticsearch.index.query.ParsedQuery; +import org.elasticsearch.search.aggregations.AggregationPhase; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregator; +import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.facet.SearchContextFacets; import org.elasticsearch.search.facet.nested.NestedFacetExecutor; import org.elasticsearch.search.highlight.HighlightField; @@ -59,7 +63,7 @@ abstract class QueryCollector extends Collector { BytesValues values; final List facetCollectors = new ArrayList(); - final Collector facetCollector; + final Collector facetAndAggregatorCollector; QueryCollector(ESLogger logger, PercolateContext context) { this.logger = logger; @@ -85,13 +89,41 @@ abstract class QueryCollector extends Collector { } } - facetCollector = facetCollectors.isEmpty() ? null : MultiCollector.wrap(facetCollectors.toArray(new Collector[facetCollectors.size()])); + List collectors = new ArrayList(facetCollectors); + if (context.aggregations() != null) { + AggregationContext aggregationContext = new AggregationContext(context); + context.aggregations().aggregationContext(aggregationContext); + + List aggregatorCollectors = new ArrayList(); + Aggregator[] aggregators = context.aggregations().factories().createTopLevelAggregators(aggregationContext); + for (int i = 0; i < aggregators.length; i++) { + if (!(aggregators[i] instanceof GlobalAggregator)) { + Aggregator aggregator = aggregators[i]; + if (aggregator.shouldCollect()) { + aggregatorCollectors.add(aggregator); + } + } + } + context.aggregations().aggregators(aggregators); + if (!aggregatorCollectors.isEmpty()) { + collectors.add(new AggregationPhase.AggregationsCollector(aggregatorCollectors, aggregationContext)); + } + } + + int size = collectors.size(); + if (size == 0) { + facetAndAggregatorCollector = null; + } else if (size == 1) { + facetAndAggregatorCollector = collectors.get(0); + } else { + facetAndAggregatorCollector = MultiCollector.wrap(collectors.toArray(new Collector[collectors.size()])); + } } @Override public void setScorer(Scorer scorer) throws IOException { - if (facetCollector != null) { - facetCollector.setScorer(scorer); + if (facetAndAggregatorCollector != null) { + facetAndAggregatorCollector.setScorer(scorer); } } @@ -99,8 +131,8 @@ abstract class QueryCollector extends Collector { public void setNextReader(AtomicReaderContext context) throws IOException { // we use the UID because id might not be indexed values = idFieldData.load(context).getBytesValues(true); - if (facetCollector != null) { - facetCollector.setNextReader(context); + if (facetAndAggregatorCollector != null) { + facetAndAggregatorCollector.setNextReader(context); } } @@ -183,8 +215,8 @@ abstract class QueryCollector extends Collector { } } counter++; - if (facetCollector != null) { - facetCollector.collect(doc); + if (facetAndAggregatorCollector != null) { + facetAndAggregatorCollector.collect(doc); } } } catch (IOException e) { @@ -228,8 +260,8 @@ abstract class QueryCollector extends Collector { searcher.search(query, collector); if (collector.exists()) { topDocsCollector.collect(doc); - if (facetCollector != null) { - facetCollector.collect(doc); + if (facetAndAggregatorCollector != null) { + facetAndAggregatorCollector.collect(doc); } } } catch (IOException e) { @@ -302,8 +334,8 @@ abstract class QueryCollector extends Collector { } } counter++; - if (facetCollector != null) { - facetCollector.collect(doc); + if (facetAndAggregatorCollector != null) { + facetAndAggregatorCollector.collect(doc); } } } catch (IOException e) { @@ -354,8 +386,8 @@ abstract class QueryCollector extends Collector { searcher.search(query, collector); if (collector.exists()) { counter++; - if (facetCollector != null) { - facetCollector.collect(doc); + if (facetAndAggregatorCollector != null) { + facetAndAggregatorCollector.collect(doc); } } } catch (IOException e) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index be95dc18c46..6b48612d52a 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -136,12 +136,12 @@ public class AggregationPhase implements SearchPhase { } - static class AggregationsCollector extends XCollector { + public static class AggregationsCollector extends XCollector { private final AggregationContext aggregationContext; private final List collectors; - AggregationsCollector(List collectors, AggregationContext aggregationContext) { + public AggregationsCollector(List collectors, AggregationContext aggregationContext) { this.collectors = collectors; this.aggregationContext = aggregationContext; } diff --git a/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 55ac287f343..784ed788fc8 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -182,6 +182,10 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl return result; } + public static InternalAggregations readOptionalAggregations(StreamInput in) throws IOException { + return in.readOptionalStreamable(new InternalAggregations()); + } + @Override public void readFrom(StreamInput in) throws IOException { int size = in.readVInt(); diff --git a/src/test/java/org/elasticsearch/percolator/PercolatorFacetsTests.java b/src/test/java/org/elasticsearch/percolator/PercolatorFacetsAndAggregationsTests.java similarity index 65% rename from src/test/java/org/elasticsearch/percolator/PercolatorFacetsTests.java rename to src/test/java/org/elasticsearch/percolator/PercolatorFacetsAndAggregationsTests.java index bc73e2f7f1b..971e8563cb1 100644 --- a/src/test/java/org/elasticsearch/percolator/PercolatorFacetsTests.java +++ b/src/test/java/org/elasticsearch/percolator/PercolatorFacetsAndAggregationsTests.java @@ -22,11 +22,17 @@ package org.elasticsearch.percolator; import org.elasticsearch.action.percolate.PercolateRequestBuilder; import org.elasticsearch.action.percolate.PercolateResponse; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.facet.FacetBuilders; import org.elasticsearch.search.facet.terms.TermsFacet; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; + import static org.elasticsearch.action.percolate.PercolateSourceBuilder.docBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; @@ -38,10 +44,11 @@ import static org.hamcrest.Matchers.equalTo; /** * */ -public class PercolatorFacetsTests extends ElasticsearchIntegrationTest { +public class PercolatorFacetsAndAggregationsTests extends ElasticsearchIntegrationTest { @Test - public void testFacets() throws Exception { + // Just test the integration with facets and aggregations, not the facet and aggregation functionality! + public void testFacetsAndAggregations() throws Exception { client().admin().indices().prepareCreate("test").execute().actionGet(); ensureGreen(); @@ -68,8 +75,15 @@ public class PercolatorFacetsTests extends ElasticsearchIntegrationTest { String value = values[i % numUniqueQueries]; PercolateRequestBuilder percolateRequestBuilder = client().preparePercolate() .setIndices("test").setDocumentType("type") - .setPercolateDoc(docBuilder().setDoc(jsonBuilder().startObject().field("field1", value).endObject())) - .addFacet(FacetBuilders.termsFacet("a").field("field2")); + .setPercolateDoc(docBuilder().setDoc(jsonBuilder().startObject().field("field1", value).endObject())); + + boolean useAggs = randomBoolean(); + if (useAggs) { + percolateRequestBuilder.addAggregation(AggregationBuilders.terms("a").field("field2")); + } else { + percolateRequestBuilder.addFacet(FacetBuilders.termsFacet("a").field("field2")); + + } if (randomBoolean()) { percolateRequestBuilder.setPercolateQuery(matchAllQuery()); @@ -91,11 +105,21 @@ public class PercolatorFacetsTests extends ElasticsearchIntegrationTest { assertThat(response.getMatches(), arrayWithSize(expectedCount[i % numUniqueQueries])); } - assertThat(response.getFacets().facets().size(), equalTo(1)); - assertThat(response.getFacets().facets().get(0).getName(), equalTo("a")); - assertThat(((TermsFacet) response.getFacets().facets().get(0)).getEntries().size(), equalTo(1)); - assertThat(((TermsFacet) response.getFacets().facets().get(0)).getEntries().get(0).getCount(), equalTo(expectedCount[i % values.length])); - assertThat(((TermsFacet) response.getFacets().facets().get(0)).getEntries().get(0).getTerm().string(), equalTo("b")); + if (useAggs) { + List aggregations = response.getAggregations().asList(); + assertThat(aggregations.size(), equalTo(1)); + assertThat(aggregations.get(0).getName(), equalTo("a")); + List buckets = new ArrayList(((Terms) aggregations.get(0)).buckets()); + assertThat(buckets.size(), equalTo(1)); + assertThat(buckets.get(0).getKey().string(), equalTo("b")); + assertThat(buckets.get(0).getDocCount(), equalTo((long) expectedCount[i % values.length])); + } else { + assertThat(response.getFacets().facets().size(), equalTo(1)); + assertThat(response.getFacets().facets().get(0).getName(), equalTo("a")); + assertThat(((TermsFacet) response.getFacets().facets().get(0)).getEntries().size(), equalTo(1)); + assertThat(((TermsFacet) response.getFacets().facets().get(0)).getEntries().get(0).getCount(), equalTo(expectedCount[i % values.length])); + assertThat(((TermsFacet) response.getFacets().facets().get(0)).getEntries().get(0).getTerm().string(), equalTo("b")); + } } }