diff --git a/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 85f53ea45e3..17d93874eab 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -115,7 +115,7 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl /** * Reduces the given lists of addAggregation. * - * @param aggregationsList A list of addAggregation to reduce + * @param aggregationsList A list of aggregation to reduce * @return The reduced addAggregation */ public static InternalAggregations reduce(List aggregationsList, CacheRecycler cacheRecycler) { @@ -123,7 +123,7 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl return null; } - // first we collect all addAggregation of the same type and list them together + // first we collect all aggregations of the same type and list them together Map> aggByName = new HashMap>(); for (InternalAggregations aggregations : aggregationsList) { @@ -150,6 +150,17 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl return result; } + /** + * Reduces this aggregations, effectively propagates the reduce to all the sub aggregations + * @param cacheRecycler + */ + public void reduce(CacheRecycler cacheRecycler) { + for (int i = 0; i < aggregations.size(); i++) { + InternalAggregation aggregation = aggregations.get(i); + aggregations.set(i, aggregation.reduce(new InternalAggregation.ReduceContext(ImmutableList.of(aggregation), cacheRecycler))); + } + } + /** The fields required to write this addAggregation to xcontent */ static class Fields { public static final XContentBuilderString AGGREGATIONS = new XContentBuilderString("aggregations"); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregation.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregation.java index 1152dede753..36fbcfa451c 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregation.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregation.java @@ -64,7 +64,9 @@ public abstract class SingleBucketAggregation aggregations = reduceContext.aggregations(); if (aggregations.size() == 1) { - return aggregations.get(0); + B reduced = ((B) aggregations.get(0)); + reduced.aggregations.reduce(reduceContext.cacheRecycler()); + return reduced; } B reduced = null; List subAggregationsList = new ArrayList(aggregations.size()); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridBuilder.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridBuilder.java index 88aac1fd578..dd79baf99b2 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridBuilder.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridBuilder.java @@ -26,15 +26,14 @@ import java.io.IOException; /** * Creates an aggregation based on bucketing points into GeoHashes - * */ public class GeoHashGridBuilder extends AggregationBuilder { private String field; - private int precision=GeoHashGridParser.DEFAULT_PRECISION; - private int requiredSize=GeoHashGridParser.DEFAULT_MAX_NUM_CELLS; - private int shardSize=0; + private int precision = GeoHashGridParser.DEFAULT_PRECISION; + private int requiredSize = GeoHashGridParser.DEFAULT_MAX_NUM_CELLS; + private int shardSize = 0; public GeoHashGridBuilder(String name) { super(name, InternalGeoHashGrid.TYPE.name()); @@ -46,18 +45,19 @@ public class GeoHashGridBuilder extends AggregationBuilder { } public GeoHashGridBuilder precision(int precision) { - if((precision<1)||(precision>12)) - { - throw new ElasticsearchIllegalArgumentException("Invalid geohash aggregation precision of "+precision - +"must be between 1 and 12"); + if ((precision < 1) || (precision > 12)) { + throw new ElasticsearchIllegalArgumentException("Invalid geohash aggregation precision of " + precision + + "must be between 1 and 12"); } this.precision = precision; return this; } + public GeoHashGridBuilder size(int requiredSize) { this.requiredSize = requiredSize; return this; } + public GeoHashGridBuilder shardSize(int shardSize) { this.shardSize = shardSize; return this; diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java index 282b09b180b..ef900d0dcb7 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java @@ -102,7 +102,10 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG } public Bucket reduce(List buckets, CacheRecycler cacheRecycler) { if (buckets.size() == 1) { - return buckets.get(0); + // we still need to reduce the sub aggs + Bucket bucket = buckets.get(0); + bucket.aggregations.reduce(cacheRecycler); + return bucket; } Bucket reduced = null; List aggregationsList = new ArrayList(buckets.size()); @@ -166,7 +169,7 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG List aggregations = reduceContext.aggregations(); if (aggregations.size() == 1) { InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregations.get(0); - grid.trimExcessEntries(); + grid.trimExcessEntries(reduceContext.cacheRecycler()); return grid; } InternalGeoHashGrid reduced = null; @@ -227,21 +230,14 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG } - protected void trimExcessEntries() { - if (requiredSize >= buckets.size()) { - return; - } - - if (buckets instanceof List) { - buckets = ((List) buckets).subList(0, requiredSize); - return; - } - + protected void trimExcessEntries(CacheRecycler cacheRecycler) { int i = 0; for (Iterator iter = buckets.iterator(); iter.hasNext();) { - iter.next(); + Bucket bucket = iter.next(); if (i++ >= requiredSize) { iter.remove(); + } else { + bucket.aggregations.reduce(cacheRecycler); } } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AbstractHistogramBase.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AbstractHistogramBase.java index a57c3facbb8..543eeb4002e 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AbstractHistogramBase.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AbstractHistogramBase.java @@ -77,7 +77,10 @@ abstract class AbstractHistogramBase extends Int Bucket reduce(List buckets, CacheRecycler cacheRecycler) { if (buckets.size() == 1) { - return buckets.get(0); + // we only need to reduce the sub aggregations + Bucket bucket = buckets.get(0); + bucket.aggregations.reduce(cacheRecycler); + return bucket; } List aggregations = new ArrayList(buckets.size()); Bucket reduced = null; @@ -172,21 +175,27 @@ abstract class AbstractHistogramBase extends Int List aggregations = reduceContext.aggregations(); if (aggregations.size() == 1) { + AbstractHistogramBase histo = (AbstractHistogramBase) aggregations.get(0); + if (minDocCount == 1) { - return aggregations.get(0); + for (B bucket : histo.buckets) { + ((Bucket) bucket).aggregations.reduce(reduceContext.cacheRecycler()); + } + return histo; } - AbstractHistogramBase histo = (AbstractHistogramBase) aggregations.get(0); + CollectionUtil.introSort(histo.buckets, order.asc ? InternalOrder.KEY_ASC.comparator() : InternalOrder.KEY_DESC.comparator()); - List list = order.asc ? histo.buckets : Lists.reverse(histo.buckets); + List list = order.asc ? histo.buckets : Lists.reverse(histo.buckets); HistogramBase.Bucket prevBucket = null; - ListIterator iter = list.listIterator(); + ListIterator iter = list.listIterator(); if (minDocCount == 0) { // we need to fill the gaps with empty buckets while (iter.hasNext()) { // look ahead on the next bucket without advancing the iter // so we'll be able to insert elements at the right position HistogramBase.Bucket nextBucket = list.get(iter.nextIndex()); + ((Bucket) nextBucket).aggregations.reduce(reduceContext.cacheRecycler()); if (prevBucket != null) { long key = emptyBucketInfo.rounding.nextRoundingValue(prevBucket.getKey()); while (key != nextBucket.getKey()) { @@ -198,8 +207,11 @@ abstract class AbstractHistogramBase extends Int } } else { while (iter.hasNext()) { - if (iter.next().getDocCount() < minDocCount) { + Bucket bucket = (Bucket) iter.next(); + if (bucket.getDocCount() < minDocCount) { iter.remove(); + } else { + bucket.aggregations.reduce(reduceContext.cacheRecycler()); } } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeBase.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeBase.java index e08780bfc8c..33a5eb668e0 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeBase.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeBase.java @@ -86,7 +86,10 @@ public abstract class AbstractRangeBase extends Inte Bucket reduce(List ranges, CacheRecycler cacheRecycler) { if (ranges.size() == 1) { - return ranges.get(0); + // we stil need to call reduce on all the sub aggregations + Bucket bucket = ranges.get(0); + bucket.aggregations.reduce(cacheRecycler); + return bucket; } Bucket reduced = null; List aggregationsList = Lists.newArrayListWithCapacity(ranges.size()); @@ -196,7 +199,11 @@ public abstract class AbstractRangeBase extends Inte public AbstractRangeBase reduce(ReduceContext reduceContext) { List aggregations = reduceContext.aggregations(); if (aggregations.size() == 1) { - return (AbstractRangeBase) aggregations.get(0); + AbstractRangeBase reduced = (AbstractRangeBase) aggregations.get(0); + for (B bucket : reduced.buckets()) { + ((Bucket) bucket).aggregations.reduce(reduceContext.cacheRecycler()); + } + return reduced; } List> rangesList = null; for (InternalAggregation aggregation : aggregations) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java index b30dac7a91e..fe586b422a2 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java @@ -107,7 +107,7 @@ public class DoubleTerms extends InternalTerms { List aggregations = reduceContext.aggregations(); if (aggregations.size() == 1) { InternalTerms terms = (InternalTerms) aggregations.get(0); - terms.trimExcessEntries(); + terms.trimExcessEntries(reduceContext.cacheRecycler()); return terms; } InternalTerms reduced = null; diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index c2717dfac9b..78f20416d76 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -63,7 +63,9 @@ public abstract class InternalTerms extends InternalAggregation implements Terms public Bucket reduce(List buckets, CacheRecycler cacheRecycler) { if (buckets.size() == 1) { - return buckets.get(0); + Bucket bucket = buckets.get(0); + bucket.aggregations.reduce(cacheRecycler); + return bucket; } Bucket reduced = null; List aggregationsList = new ArrayList(buckets.size()); @@ -124,12 +126,11 @@ public abstract class InternalTerms extends InternalAggregation implements Terms List aggregations = reduceContext.aggregations(); if (aggregations.size() == 1) { InternalTerms terms = (InternalTerms) aggregations.get(0); - terms.trimExcessEntries(); + terms.trimExcessEntries(reduceContext.cacheRecycler()); return terms; } - InternalTerms reduced = null; - // TODO: would it be better to use a hppc map and then directly work on the backing array instead of using a PQ? + InternalTerms reduced = null; Map> buckets = null; for (InternalAggregation aggregation : aggregations) { @@ -175,7 +176,7 @@ public abstract class InternalTerms extends InternalAggregation implements Terms return reduced; } - final void trimExcessEntries() { + final void trimExcessEntries(CacheRecycler cacheRecycler) { final List newBuckets = Lists.newArrayList(); for (Bucket b : buckets) { if (newBuckets.size() >= requiredSize) { @@ -183,6 +184,7 @@ public abstract class InternalTerms extends InternalAggregation implements Terms } if (b.docCount >= minDocCount) { newBuckets.add(b); + b.aggregations.reduce(cacheRecycler); } } buckets = newBuckets; diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java index 0bdd68ab750..e4b5a92a4ca 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java @@ -104,7 +104,7 @@ public class LongTerms extends InternalTerms { List aggregations = reduceContext.aggregations(); if (aggregations.size() == 1) { InternalTerms terms = (InternalTerms) aggregations.get(0); - terms.trimExcessEntries(); + terms.trimExcessEntries(reduceContext.cacheRecycler()); return terms; } InternalTerms reduced = null; diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/GeoHashGridTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/GeoHashGridTests.java index c544015f576..50b434ade31 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/GeoHashGridTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/GeoHashGridTests.java @@ -64,14 +64,14 @@ public class GeoHashGridTests extends ElasticsearchIntegrationTest { source = source.endObject(); return client().prepareIndex("idx", "type").setSource(source); } - - ObjectIntMapexpectedDocCountsForGeoHash=null; - int highestPrecisionGeohash=12; - int numRandomPoints=100; - - String smallestGeoHash=null; - + + ObjectIntMap expectedDocCountsForGeoHash = null; + int highestPrecisionGeohash = 12; + int numRandomPoints = 100; + + String smallestGeoHash = null; + @Before public void init() throws Exception { prepareCreate("idx") @@ -82,24 +82,24 @@ public class GeoHashGridTests extends ElasticsearchIntegrationTest { List cities = new ArrayList(); Random random = getRandom(); - expectedDocCountsForGeoHash=new ObjectIntOpenHashMap(numRandomPoints*2); + expectedDocCountsForGeoHash = new ObjectIntOpenHashMap(numRandomPoints * 2); for (int i = 0; i < numRandomPoints; i++) { //generate random point - double lat=(180d*random.nextDouble())-90d; - double lng=(360d*random.nextDouble())-180d; - String randomGeoHash=GeoHashUtils.encode(lat, lng,highestPrecisionGeohash); + double lat = (180d * random.nextDouble()) - 90d; + double lng = (360d * random.nextDouble()) - 180d; + String randomGeoHash = GeoHashUtils.encode(lat, lng, highestPrecisionGeohash); //Index at the highest resolution - cities.add(indexCity(randomGeoHash, lat+", "+lng)); - expectedDocCountsForGeoHash.put(randomGeoHash, expectedDocCountsForGeoHash.getOrDefault(randomGeoHash, 0)+1); + cities.add(indexCity(randomGeoHash, lat + ", " + lng)); + expectedDocCountsForGeoHash.put(randomGeoHash, expectedDocCountsForGeoHash.getOrDefault(randomGeoHash, 0) + 1); //Update expected doc counts for all resolutions.. - for (int precision = highestPrecisionGeohash-1; precision >0; precision--) { - String hash=GeoHashUtils.encode(lat, lng,precision); - if((smallestGeoHash==null)||(hash.length() 0; precision--) { + String hash = GeoHashUtils.encode(lat, lng, precision); + if ((smallestGeoHash == null) || (hash.length() < smallestGeoHash.length())) { + smallestGeoHash = hash; } - expectedDocCountsForGeoHash.put(hash, expectedDocCountsForGeoHash.getOrDefault(hash, 0)+1); + expectedDocCountsForGeoHash.put(hash, expectedDocCountsForGeoHash.getOrDefault(hash, 0) + 1); } - } + } indexRandom(true, cities); } @@ -111,23 +111,24 @@ public class GeoHashGridTests extends ElasticsearchIntegrationTest { .addAggregation(geohashGrid("geohashgrid") .field("location") .precision(precision) - ) - .execute().actionGet(); + ) + .execute().actionGet(); assertThat(response.getFailedShards(), equalTo(0)); GeoHashGrid geoGrid = response.getAggregations().get("geohashgrid"); - for (GeoHashGrid.Bucket cell : geoGrid ){ - String geohash=cell.getGeoHash(); + for (GeoHashGrid.Bucket cell : geoGrid) { + String geohash = cell.getGeoHash(); - long bucketCount=cell.getDocCount(); - int expectedBucketCount=expectedDocCountsForGeoHash.get(geohash); + long bucketCount = cell.getDocCount(); + int expectedBucketCount = expectedDocCountsForGeoHash.get(geohash); assertNotSame(bucketCount, 0); - assertEquals("Geohash "+geohash+" has wrong doc count ", - expectedBucketCount,bucketCount); + assertEquals("Geohash " + geohash + " has wrong doc count ", + expectedBucketCount, bucketCount); } } } + @Test public void filtered() throws Exception { GeoBoundingBoxFilterBuilder bbox = new GeoBoundingBoxFilterBuilder("location"); @@ -136,79 +137,79 @@ public class GeoHashGridTests extends ElasticsearchIntegrationTest { SearchResponse response = client().prepareSearch("idx") .addAggregation( AggregationBuilders.filter("filtered").filter(bbox) - .subAggregation( - geohashGrid("geohashgrid") - .field("location") - .precision(precision) - ) - ) - .execute().actionGet(); + .subAggregation( + geohashGrid("geohashgrid") + .field("location") + .precision(precision) + ) + ) + .execute().actionGet(); assertThat(response.getFailedShards(), equalTo(0)); - - - Filter filter =response.getAggregations().get("filtered"); - GeoHashGrid geoGrid = filter.getAggregations().get("geohashgrid"); - for (GeoHashGrid.Bucket cell : geoGrid ){ + + Filter filter = response.getAggregations().get("filtered"); + + GeoHashGrid geoGrid = filter.getAggregations().get("geohashgrid"); + for (GeoHashGrid.Bucket cell : geoGrid) { String geohash = cell.getGeoHash(); - long bucketCount=cell.getDocCount(); - int expectedBucketCount=expectedDocCountsForGeoHash.get(geohash); + long bucketCount = cell.getDocCount(); + int expectedBucketCount = expectedDocCountsForGeoHash.get(geohash); assertNotSame(bucketCount, 0); assertTrue("Buckets must be filtered", geohash.startsWith(smallestGeoHash)); - assertEquals("Geohash "+geohash+" has wrong doc count ", - expectedBucketCount,bucketCount); - + assertEquals("Geohash " + geohash + " has wrong doc count ", + expectedBucketCount, bucketCount); + } } } - + @Test public void unmapped() throws Exception { client().admin().cluster().prepareHealth("idx_unmapped").setWaitForYellowStatus().execute().actionGet(); - - + + for (int precision = 1; precision <= highestPrecisionGeohash; precision++) { SearchResponse response = client().prepareSearch("idx_unmapped") .addAggregation(geohashGrid("geohashgrid") .field("location") .precision(precision) - ) - .execute().actionGet(); + ) + .execute().actionGet(); assertThat(response.getFailedShards(), equalTo(0)); GeoHashGrid geoGrid = response.getAggregations().get("geohashgrid"); assertThat(geoGrid.getNumberOfBuckets(), equalTo(0)); - } + } } - + @Test public void partiallyUnmapped() throws Exception { for (int precision = 1; precision <= highestPrecisionGeohash; precision++) { - SearchResponse response = client().prepareSearch("idx","idx_unmapped") + SearchResponse response = client().prepareSearch("idx", "idx_unmapped") .addAggregation(geohashGrid("geohashgrid") .field("location") .precision(precision) - ) - .execute().actionGet(); + ) + .execute().actionGet(); assertThat(response.getFailedShards(), equalTo(0)); GeoHashGrid geoGrid = response.getAggregations().get("geohashgrid"); - for (GeoHashGrid.Bucket cell : geoGrid ){ - String geohash=cell.getGeoHash(); + for (GeoHashGrid.Bucket cell : geoGrid) { + String geohash = cell.getGeoHash(); - long bucketCount=cell.getDocCount(); - int expectedBucketCount=expectedDocCountsForGeoHash.get(geohash); + long bucketCount = cell.getDocCount(); + int expectedBucketCount = expectedDocCountsForGeoHash.get(geohash); assertNotSame(bucketCount, 0); - assertEquals("Geohash "+geohash+" has wrong doc count ", - expectedBucketCount,bucketCount); + assertEquals("Geohash " + geohash + " has wrong doc count ", + expectedBucketCount, bucketCount); } } - } - + } + @Test public void testTopMatch() throws Exception { for (int precision = 1; precision <= highestPrecisionGeohash; precision++) { @@ -218,29 +219,28 @@ public class GeoHashGridTests extends ElasticsearchIntegrationTest { .size(1) .shardSize(100) .precision(precision) - ) - .execute().actionGet(); + ) + .execute().actionGet(); assertThat(response.getFailedShards(), equalTo(0)); GeoHashGrid geoGrid = response.getAggregations().get("geohashgrid"); //Check we only have one bucket with the best match for that resolution assertThat(geoGrid.getNumberOfBuckets(), equalTo(1)); - for (GeoHashGrid.Bucket cell : geoGrid ){ - String geohash=cell.getGeoHash(); - long bucketCount=cell.getDocCount(); - int expectedBucketCount=0; + for (GeoHashGrid.Bucket cell : geoGrid) { + String geohash = cell.getGeoHash(); + long bucketCount = cell.getDocCount(); + int expectedBucketCount = 0; for (ObjectIntCursor cursor : expectedDocCountsForGeoHash) { - if(cursor.key.length()==precision) - { - expectedBucketCount=Math.max(expectedBucketCount, cursor.value); + if (cursor.key.length() == precision) { + expectedBucketCount = Math.max(expectedBucketCount, cursor.value); } } assertNotSame(bucketCount, 0); - assertEquals("Geohash "+geohash+" has wrong doc count ", - expectedBucketCount,bucketCount); + assertEquals("Geohash " + geohash + " has wrong doc count ", + expectedBucketCount, bucketCount); } } - } + } } diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/ShardReduceTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/ShardReduceTests.java new file mode 100644 index 00000000000..026a80683a6 --- /dev/null +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/ShardReduceTests.java @@ -0,0 +1,325 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.geo.GeoHashUtils; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.FilterBuilders; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.aggregations.bucket.filter.Filter; +import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGrid; +import org.elasticsearch.search.aggregations.bucket.global.Global; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogram; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.missing.Missing; +import org.elasticsearch.search.aggregations.bucket.nested.Nested; +import org.elasticsearch.search.aggregations.bucket.range.Range; +import org.elasticsearch.search.aggregations.bucket.range.date.DateRange; +import org.elasticsearch.search.aggregations.bucket.range.ipv4.IPv4Range; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Before; +import org.junit.Test; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.search.aggregations.AggregationBuilders.*; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests making sure that the reduce is propagated to all aggregations in the hierarchy when executing on a single shard + * These tests are based on the date histogram in combination of min_doc_count=0. In order for the date histogram to + * compute empty buckets, its {@code reduce()} method must be called. So by adding the date histogram under other buckets, + * we can make sure that the reduce is properly propagated by checking that empty buckets were created. + */ +public class ShardReduceTests extends ElasticsearchIntegrationTest { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return ImmutableSettings.builder() + .put("index.number_of_shards", randomBoolean() ? 1 : randomIntBetween(2, 10)) + .put("index.number_of_replicas", randomIntBetween(0, 1)) + .build(); + } + + private IndexRequestBuilder indexDoc(String date, int value) throws Exception { + return client().prepareIndex("idx", "type").setSource(jsonBuilder() + .startObject() + .field("value", value) + .field("ip", "10.0.0." + value) + .field("location", GeoHashUtils.encode(52, 5, 12)) + .field("date", date) + .field("term-l", 1) + .field("term-d", 1.5) + .field("term-s", "term") + .startObject("nested") + .field("date", date) + .endObject() + .endObject()); + } + + @Before + public void init() throws Exception { + prepareCreate("idx") + .addMapping("type", "nested", "type=nested", "ip", "type=ip", "location", "type=geo_point") + .setSettings(indexSettings()) + .execute().actionGet(); + + indexRandom(true, + indexDoc("2014-01-01", 1), + indexDoc("2014-01-02", 2), + indexDoc("2014-01-04", 3)); + ensureSearchable(); + } + + @Test + public void testGlobal() throws Exception { + + SearchResponse response = client().prepareSearch("idx") + .setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(global("global") + .subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0))) + .execute().actionGet(); + + assertSearchResponse(response); + + Global global = response.getAggregations().get("global"); + DateHistogram histo = global.getAggregations().get("histo"); + assertThat(histo.buckets().size(), equalTo(4)); + } + + @Test + public void testFilter() throws Exception { + + SearchResponse response = client().prepareSearch("idx") + .setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(filter("filter").filter(FilterBuilders.matchAllFilter()) + .subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0))) + .execute().actionGet(); + + assertSearchResponse(response); + + Filter filter = response.getAggregations().get("filter"); + DateHistogram histo = filter.getAggregations().get("histo"); + assertThat(histo.buckets().size(), equalTo(4)); + } + + @Test + public void testMissing() throws Exception { + + SearchResponse response = client().prepareSearch("idx") + .setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(missing("missing").field("foobar") + .subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0))) + .execute().actionGet(); + + assertSearchResponse(response); + + Missing missing = response.getAggregations().get("missing"); + DateHistogram histo = missing.getAggregations().get("histo"); + assertThat(histo.buckets().size(), equalTo(4)); + } + + @Test + public void testGlobalWithFilterWithMissing() throws Exception { + + SearchResponse response = client().prepareSearch("idx") + .setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(global("global") + .subAggregation(filter("filter").filter(FilterBuilders.matchAllFilter()) + .subAggregation(missing("missing").field("foobar") + .subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0))))) + .execute().actionGet(); + + assertSearchResponse(response); + + Global global = response.getAggregations().get("global"); + Filter filter = global.getAggregations().get("filter"); + Missing missing = filter.getAggregations().get("missing"); + DateHistogram histo = missing.getAggregations().get("histo"); + assertThat(histo.buckets().size(), equalTo(4)); + } + + @Test + public void testNested() throws Exception { + + SearchResponse response = client().prepareSearch("idx") + .setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(nested("nested").path("nested") + .subAggregation(dateHistogram("histo").field("nested.date").interval(DateHistogram.Interval.DAY).minDocCount(0))) + .execute().actionGet(); + + assertSearchResponse(response); + + Nested nested = response.getAggregations().get("nested"); + DateHistogram histo = nested.getAggregations().get("histo"); + assertThat(histo.buckets().size(), equalTo(4)); + } + + @Test + public void testStringTerms() throws Exception { + + SearchResponse response = client().prepareSearch("idx") + .setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(terms("terms").field("term-s") + .subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0))) + .execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + DateHistogram histo = terms.getByTerm("term").getAggregations().get("histo"); + assertThat(histo.buckets().size(), equalTo(4)); + } + + @Test + public void testLongTerms() throws Exception { + + SearchResponse response = client().prepareSearch("idx") + .setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(terms("terms").field("term-l") + .subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0))) + .execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + DateHistogram histo = terms.getByTerm("1").getAggregations().get("histo"); + assertThat(histo.buckets().size(), equalTo(4)); + } + + @Test + public void testDoubleTerms() throws Exception { + + SearchResponse response = client().prepareSearch("idx") + .setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(terms("terms").field("term-d") + .subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0))) + .execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + DateHistogram histo = terms.getByTerm("1.5").getAggregations().get("histo"); + assertThat(histo.buckets().size(), equalTo(4)); + } + + @Test + public void testRange() throws Exception { + + SearchResponse response = client().prepareSearch("idx") + .setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(range("range").field("value").addRange("r1", 0, 10) + .subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0))) + .execute().actionGet(); + + assertSearchResponse(response); + + Range range = response.getAggregations().get("range"); + DateHistogram histo = range.getByKey("r1").getAggregations().get("histo"); + assertThat(histo.buckets().size(), equalTo(4)); + } + + @Test + public void testDateRange() throws Exception { + + SearchResponse response = client().prepareSearch("idx") + .setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(dateRange("range").field("date").addRange("r1", "2014-01-01", "2014-01-10") + .subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0))) + .execute().actionGet(); + + assertSearchResponse(response); + + DateRange range = response.getAggregations().get("range"); + DateHistogram histo = range.getByKey("r1").getAggregations().get("histo"); + assertThat(histo.buckets().size(), equalTo(4)); + } + + @Test + public void testIpRange() throws Exception { + + SearchResponse response = client().prepareSearch("idx") + .setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(ipRange("range").field("ip").addRange("r1", "10.0.0.1", "10.0.0.10") + .subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0))) + .execute().actionGet(); + + assertSearchResponse(response); + + IPv4Range range = response.getAggregations().get("range"); + DateHistogram histo = range.getByKey("r1").getAggregations().get("histo"); + assertThat(histo.buckets().size(), equalTo(4)); + } + + @Test + public void testHistogram() throws Exception { + + SearchResponse response = client().prepareSearch("idx") + .setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(histogram("topHisto").field("value").interval(5) + .subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0))) + .execute().actionGet(); + + assertSearchResponse(response); + + Histogram topHisto = response.getAggregations().get("topHisto"); + DateHistogram histo = topHisto.getByKey(0).getAggregations().get("histo"); + assertThat(histo.buckets().size(), equalTo(4)); + } + + @Test + public void testDateHistogram() throws Exception { + + SearchResponse response = client().prepareSearch("idx") + .setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(dateHistogram("topHisto").field("date").interval(DateHistogram.Interval.MONTH) + .subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0))) + .execute().actionGet(); + + assertSearchResponse(response); + + DateHistogram topHisto = response.getAggregations().get("topHisto"); + DateHistogram histo = topHisto.iterator().next().getAggregations().get("histo"); + assertThat(histo.buckets().size(), equalTo(4)); + } + + @Test + public void testGeoHashGrid() throws Exception { + + SearchResponse response = client().prepareSearch("idx") + .setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(geohashGrid("grid").field("location") + .subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0))) + .execute().actionGet(); + + assertSearchResponse(response); + + GeoHashGrid grid = response.getAggregations().get("grid"); + DateHistogram histo = grid.iterator().next().getAggregations().get("histo"); + assertThat(histo.buckets().size(), equalTo(4)); + } + + +}