Fixed an issue where there are sug aggregations executing on a single shard, the reduce call was not propagated properly down the agg hierarchy.
Closes #4843
This commit is contained in:
parent
93a8e80aff
commit
da953700f4
|
@ -115,7 +115,7 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
|
|||
/**
|
||||
* Reduces the given lists of addAggregation.
|
||||
*
|
||||
* @param aggregationsList A list of addAggregation to reduce
|
||||
* @param aggregationsList A list of aggregation to reduce
|
||||
* @return The reduced addAggregation
|
||||
*/
|
||||
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, CacheRecycler cacheRecycler) {
|
||||
|
@ -123,7 +123,7 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
|
|||
return null;
|
||||
}
|
||||
|
||||
// first we collect all addAggregation of the same type and list them together
|
||||
// first we collect all aggregations of the same type and list them together
|
||||
|
||||
Map<String, List<InternalAggregation>> aggByName = new HashMap<String, List<InternalAggregation>>();
|
||||
for (InternalAggregations aggregations : aggregationsList) {
|
||||
|
@ -150,6 +150,17 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
|
|||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reduces this aggregations, effectively propagates the reduce to all the sub aggregations
|
||||
* @param cacheRecycler
|
||||
*/
|
||||
public void reduce(CacheRecycler cacheRecycler) {
|
||||
for (int i = 0; i < aggregations.size(); i++) {
|
||||
InternalAggregation aggregation = aggregations.get(i);
|
||||
aggregations.set(i, aggregation.reduce(new InternalAggregation.ReduceContext(ImmutableList.of(aggregation), cacheRecycler)));
|
||||
}
|
||||
}
|
||||
|
||||
/** The fields required to write this addAggregation to xcontent */
|
||||
static class Fields {
|
||||
public static final XContentBuilderString AGGREGATIONS = new XContentBuilderString("aggregations");
|
||||
|
|
|
@ -64,7 +64,9 @@ public abstract class SingleBucketAggregation<B extends SingleBucketAggregation<
|
|||
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
return aggregations.get(0);
|
||||
B reduced = ((B) aggregations.get(0));
|
||||
reduced.aggregations.reduce(reduceContext.cacheRecycler());
|
||||
return reduced;
|
||||
}
|
||||
B reduced = null;
|
||||
List<InternalAggregations> subAggregationsList = new ArrayList<InternalAggregations>(aggregations.size());
|
||||
|
|
|
@ -26,15 +26,14 @@ import java.io.IOException;
|
|||
|
||||
/**
|
||||
* Creates an aggregation based on bucketing points into GeoHashes
|
||||
*
|
||||
*/
|
||||
public class GeoHashGridBuilder extends AggregationBuilder<GeoHashGridBuilder> {
|
||||
|
||||
|
||||
private String field;
|
||||
private int precision=GeoHashGridParser.DEFAULT_PRECISION;
|
||||
private int requiredSize=GeoHashGridParser.DEFAULT_MAX_NUM_CELLS;
|
||||
private int shardSize=0;
|
||||
private int precision = GeoHashGridParser.DEFAULT_PRECISION;
|
||||
private int requiredSize = GeoHashGridParser.DEFAULT_MAX_NUM_CELLS;
|
||||
private int shardSize = 0;
|
||||
|
||||
public GeoHashGridBuilder(String name) {
|
||||
super(name, InternalGeoHashGrid.TYPE.name());
|
||||
|
@ -46,18 +45,19 @@ public class GeoHashGridBuilder extends AggregationBuilder<GeoHashGridBuilder> {
|
|||
}
|
||||
|
||||
public GeoHashGridBuilder precision(int precision) {
|
||||
if((precision<1)||(precision>12))
|
||||
{
|
||||
throw new ElasticsearchIllegalArgumentException("Invalid geohash aggregation precision of "+precision
|
||||
+"must be between 1 and 12");
|
||||
if ((precision < 1) || (precision > 12)) {
|
||||
throw new ElasticsearchIllegalArgumentException("Invalid geohash aggregation precision of " + precision
|
||||
+ "must be between 1 and 12");
|
||||
}
|
||||
this.precision = precision;
|
||||
return this;
|
||||
}
|
||||
|
||||
public GeoHashGridBuilder size(int requiredSize) {
|
||||
this.requiredSize = requiredSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public GeoHashGridBuilder shardSize(int shardSize) {
|
||||
this.shardSize = shardSize;
|
||||
return this;
|
||||
|
|
|
@ -102,7 +102,10 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
|
|||
}
|
||||
public Bucket reduce(List<? extends Bucket> buckets, CacheRecycler cacheRecycler) {
|
||||
if (buckets.size() == 1) {
|
||||
return buckets.get(0);
|
||||
// we still need to reduce the sub aggs
|
||||
Bucket bucket = buckets.get(0);
|
||||
bucket.aggregations.reduce(cacheRecycler);
|
||||
return bucket;
|
||||
}
|
||||
Bucket reduced = null;
|
||||
List<InternalAggregations> aggregationsList = new ArrayList<InternalAggregations>(buckets.size());
|
||||
|
@ -166,7 +169,7 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
|
|||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregations.get(0);
|
||||
grid.trimExcessEntries();
|
||||
grid.trimExcessEntries(reduceContext.cacheRecycler());
|
||||
return grid;
|
||||
}
|
||||
InternalGeoHashGrid reduced = null;
|
||||
|
@ -227,21 +230,14 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
|
|||
}
|
||||
|
||||
|
||||
protected void trimExcessEntries() {
|
||||
if (requiredSize >= buckets.size()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (buckets instanceof List) {
|
||||
buckets = ((List) buckets).subList(0, requiredSize);
|
||||
return;
|
||||
}
|
||||
|
||||
protected void trimExcessEntries(CacheRecycler cacheRecycler) {
|
||||
int i = 0;
|
||||
for (Iterator<Bucket> iter = buckets.iterator(); iter.hasNext();) {
|
||||
iter.next();
|
||||
Bucket bucket = iter.next();
|
||||
if (i++ >= requiredSize) {
|
||||
iter.remove();
|
||||
} else {
|
||||
bucket.aggregations.reduce(cacheRecycler);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,7 +77,10 @@ abstract class AbstractHistogramBase<B extends HistogramBase.Bucket> extends Int
|
|||
|
||||
Bucket reduce(List<Bucket> buckets, CacheRecycler cacheRecycler) {
|
||||
if (buckets.size() == 1) {
|
||||
return buckets.get(0);
|
||||
// we only need to reduce the sub aggregations
|
||||
Bucket bucket = buckets.get(0);
|
||||
bucket.aggregations.reduce(cacheRecycler);
|
||||
return bucket;
|
||||
}
|
||||
List<InternalAggregations> aggregations = new ArrayList<InternalAggregations>(buckets.size());
|
||||
Bucket reduced = null;
|
||||
|
@ -172,21 +175,27 @@ abstract class AbstractHistogramBase<B extends HistogramBase.Bucket> extends Int
|
|||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
|
||||
AbstractHistogramBase<B> histo = (AbstractHistogramBase<B>) aggregations.get(0);
|
||||
|
||||
if (minDocCount == 1) {
|
||||
return aggregations.get(0);
|
||||
for (B bucket : histo.buckets) {
|
||||
((Bucket) bucket).aggregations.reduce(reduceContext.cacheRecycler());
|
||||
}
|
||||
return histo;
|
||||
}
|
||||
|
||||
AbstractHistogramBase histo = (AbstractHistogramBase) aggregations.get(0);
|
||||
|
||||
CollectionUtil.introSort(histo.buckets, order.asc ? InternalOrder.KEY_ASC.comparator() : InternalOrder.KEY_DESC.comparator());
|
||||
List<HistogramBase.Bucket> list = order.asc ? histo.buckets : Lists.reverse(histo.buckets);
|
||||
List<B> list = order.asc ? histo.buckets : Lists.reverse(histo.buckets);
|
||||
HistogramBase.Bucket prevBucket = null;
|
||||
ListIterator<HistogramBase.Bucket> iter = list.listIterator();
|
||||
ListIterator<B> iter = list.listIterator();
|
||||
if (minDocCount == 0) {
|
||||
// we need to fill the gaps with empty buckets
|
||||
while (iter.hasNext()) {
|
||||
// look ahead on the next bucket without advancing the iter
|
||||
// so we'll be able to insert elements at the right position
|
||||
HistogramBase.Bucket nextBucket = list.get(iter.nextIndex());
|
||||
((Bucket) nextBucket).aggregations.reduce(reduceContext.cacheRecycler());
|
||||
if (prevBucket != null) {
|
||||
long key = emptyBucketInfo.rounding.nextRoundingValue(prevBucket.getKey());
|
||||
while (key != nextBucket.getKey()) {
|
||||
|
@ -198,8 +207,11 @@ abstract class AbstractHistogramBase<B extends HistogramBase.Bucket> extends Int
|
|||
}
|
||||
} else {
|
||||
while (iter.hasNext()) {
|
||||
if (iter.next().getDocCount() < minDocCount) {
|
||||
Bucket bucket = (Bucket) iter.next();
|
||||
if (bucket.getDocCount() < minDocCount) {
|
||||
iter.remove();
|
||||
} else {
|
||||
bucket.aggregations.reduce(reduceContext.cacheRecycler());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -86,7 +86,10 @@ public abstract class AbstractRangeBase<B extends RangeBase.Bucket> extends Inte
|
|||
|
||||
Bucket reduce(List<Bucket> ranges, CacheRecycler cacheRecycler) {
|
||||
if (ranges.size() == 1) {
|
||||
return ranges.get(0);
|
||||
// we stil need to call reduce on all the sub aggregations
|
||||
Bucket bucket = ranges.get(0);
|
||||
bucket.aggregations.reduce(cacheRecycler);
|
||||
return bucket;
|
||||
}
|
||||
Bucket reduced = null;
|
||||
List<InternalAggregations> aggregationsList = Lists.newArrayListWithCapacity(ranges.size());
|
||||
|
@ -196,7 +199,11 @@ public abstract class AbstractRangeBase<B extends RangeBase.Bucket> extends Inte
|
|||
public AbstractRangeBase reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
return (AbstractRangeBase) aggregations.get(0);
|
||||
AbstractRangeBase<B> reduced = (AbstractRangeBase<B>) aggregations.get(0);
|
||||
for (B bucket : reduced.buckets()) {
|
||||
((Bucket) bucket).aggregations.reduce(reduceContext.cacheRecycler());
|
||||
}
|
||||
return reduced;
|
||||
}
|
||||
List<List<Bucket>> rangesList = null;
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
|
|
|
@ -107,7 +107,7 @@ public class DoubleTerms extends InternalTerms {
|
|||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
InternalTerms terms = (InternalTerms) aggregations.get(0);
|
||||
terms.trimExcessEntries();
|
||||
terms.trimExcessEntries(reduceContext.cacheRecycler());
|
||||
return terms;
|
||||
}
|
||||
InternalTerms reduced = null;
|
||||
|
|
|
@ -63,7 +63,9 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
|
|||
|
||||
public Bucket reduce(List<? extends Bucket> buckets, CacheRecycler cacheRecycler) {
|
||||
if (buckets.size() == 1) {
|
||||
return buckets.get(0);
|
||||
Bucket bucket = buckets.get(0);
|
||||
bucket.aggregations.reduce(cacheRecycler);
|
||||
return bucket;
|
||||
}
|
||||
Bucket reduced = null;
|
||||
List<InternalAggregations> aggregationsList = new ArrayList<InternalAggregations>(buckets.size());
|
||||
|
@ -124,12 +126,11 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
|
|||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
InternalTerms terms = (InternalTerms) aggregations.get(0);
|
||||
terms.trimExcessEntries();
|
||||
terms.trimExcessEntries(reduceContext.cacheRecycler());
|
||||
return terms;
|
||||
}
|
||||
InternalTerms reduced = null;
|
||||
|
||||
// TODO: would it be better to use a hppc map and then directly work on the backing array instead of using a PQ?
|
||||
InternalTerms reduced = null;
|
||||
|
||||
Map<Text, List<InternalTerms.Bucket>> buckets = null;
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
|
@ -175,7 +176,7 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
|
|||
return reduced;
|
||||
}
|
||||
|
||||
final void trimExcessEntries() {
|
||||
final void trimExcessEntries(CacheRecycler cacheRecycler) {
|
||||
final List<Bucket> newBuckets = Lists.newArrayList();
|
||||
for (Bucket b : buckets) {
|
||||
if (newBuckets.size() >= requiredSize) {
|
||||
|
@ -183,6 +184,7 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
|
|||
}
|
||||
if (b.docCount >= minDocCount) {
|
||||
newBuckets.add(b);
|
||||
b.aggregations.reduce(cacheRecycler);
|
||||
}
|
||||
}
|
||||
buckets = newBuckets;
|
||||
|
|
|
@ -104,7 +104,7 @@ public class LongTerms extends InternalTerms {
|
|||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
InternalTerms terms = (InternalTerms) aggregations.get(0);
|
||||
terms.trimExcessEntries();
|
||||
terms.trimExcessEntries(reduceContext.cacheRecycler());
|
||||
return terms;
|
||||
}
|
||||
InternalTerms reduced = null;
|
||||
|
|
|
@ -64,14 +64,14 @@ public class GeoHashGridTests extends ElasticsearchIntegrationTest {
|
|||
source = source.endObject();
|
||||
return client().prepareIndex("idx", "type").setSource(source);
|
||||
}
|
||||
|
||||
|
||||
ObjectIntMap<String>expectedDocCountsForGeoHash=null;
|
||||
int highestPrecisionGeohash=12;
|
||||
int numRandomPoints=100;
|
||||
|
||||
String smallestGeoHash=null;
|
||||
|
||||
|
||||
ObjectIntMap<String> expectedDocCountsForGeoHash = null;
|
||||
int highestPrecisionGeohash = 12;
|
||||
int numRandomPoints = 100;
|
||||
|
||||
String smallestGeoHash = null;
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
prepareCreate("idx")
|
||||
|
@ -82,24 +82,24 @@ public class GeoHashGridTests extends ElasticsearchIntegrationTest {
|
|||
|
||||
List<IndexRequestBuilder> cities = new ArrayList<IndexRequestBuilder>();
|
||||
Random random = getRandom();
|
||||
expectedDocCountsForGeoHash=new ObjectIntOpenHashMap<String>(numRandomPoints*2);
|
||||
expectedDocCountsForGeoHash = new ObjectIntOpenHashMap<String>(numRandomPoints * 2);
|
||||
for (int i = 0; i < numRandomPoints; i++) {
|
||||
//generate random point
|
||||
double lat=(180d*random.nextDouble())-90d;
|
||||
double lng=(360d*random.nextDouble())-180d;
|
||||
String randomGeoHash=GeoHashUtils.encode(lat, lng,highestPrecisionGeohash);
|
||||
double lat = (180d * random.nextDouble()) - 90d;
|
||||
double lng = (360d * random.nextDouble()) - 180d;
|
||||
String randomGeoHash = GeoHashUtils.encode(lat, lng, highestPrecisionGeohash);
|
||||
//Index at the highest resolution
|
||||
cities.add(indexCity(randomGeoHash, lat+", "+lng));
|
||||
expectedDocCountsForGeoHash.put(randomGeoHash, expectedDocCountsForGeoHash.getOrDefault(randomGeoHash, 0)+1);
|
||||
cities.add(indexCity(randomGeoHash, lat + ", " + lng));
|
||||
expectedDocCountsForGeoHash.put(randomGeoHash, expectedDocCountsForGeoHash.getOrDefault(randomGeoHash, 0) + 1);
|
||||
//Update expected doc counts for all resolutions..
|
||||
for (int precision = highestPrecisionGeohash-1; precision >0; precision--) {
|
||||
String hash=GeoHashUtils.encode(lat, lng,precision);
|
||||
if((smallestGeoHash==null)||(hash.length()<smallestGeoHash.length())) {
|
||||
smallestGeoHash=hash;
|
||||
for (int precision = highestPrecisionGeohash - 1; precision > 0; precision--) {
|
||||
String hash = GeoHashUtils.encode(lat, lng, precision);
|
||||
if ((smallestGeoHash == null) || (hash.length() < smallestGeoHash.length())) {
|
||||
smallestGeoHash = hash;
|
||||
}
|
||||
expectedDocCountsForGeoHash.put(hash, expectedDocCountsForGeoHash.getOrDefault(hash, 0)+1);
|
||||
expectedDocCountsForGeoHash.put(hash, expectedDocCountsForGeoHash.getOrDefault(hash, 0) + 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
indexRandom(true, cities);
|
||||
}
|
||||
|
||||
|
@ -111,23 +111,24 @@ public class GeoHashGridTests extends ElasticsearchIntegrationTest {
|
|||
.addAggregation(geohashGrid("geohashgrid")
|
||||
.field("location")
|
||||
.precision(precision)
|
||||
)
|
||||
.execute().actionGet();
|
||||
)
|
||||
.execute().actionGet();
|
||||
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
|
||||
GeoHashGrid geoGrid = response.getAggregations().get("geohashgrid");
|
||||
for (GeoHashGrid.Bucket cell : geoGrid ){
|
||||
String geohash=cell.getGeoHash();
|
||||
for (GeoHashGrid.Bucket cell : geoGrid) {
|
||||
String geohash = cell.getGeoHash();
|
||||
|
||||
long bucketCount=cell.getDocCount();
|
||||
int expectedBucketCount=expectedDocCountsForGeoHash.get(geohash);
|
||||
long bucketCount = cell.getDocCount();
|
||||
int expectedBucketCount = expectedDocCountsForGeoHash.get(geohash);
|
||||
assertNotSame(bucketCount, 0);
|
||||
assertEquals("Geohash "+geohash+" has wrong doc count ",
|
||||
expectedBucketCount,bucketCount);
|
||||
assertEquals("Geohash " + geohash + " has wrong doc count ",
|
||||
expectedBucketCount, bucketCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void filtered() throws Exception {
|
||||
GeoBoundingBoxFilterBuilder bbox = new GeoBoundingBoxFilterBuilder("location");
|
||||
|
@ -136,79 +137,79 @@ public class GeoHashGridTests extends ElasticsearchIntegrationTest {
|
|||
SearchResponse response = client().prepareSearch("idx")
|
||||
.addAggregation(
|
||||
AggregationBuilders.filter("filtered").filter(bbox)
|
||||
.subAggregation(
|
||||
geohashGrid("geohashgrid")
|
||||
.field("location")
|
||||
.precision(precision)
|
||||
)
|
||||
)
|
||||
.execute().actionGet();
|
||||
.subAggregation(
|
||||
geohashGrid("geohashgrid")
|
||||
.field("location")
|
||||
.precision(precision)
|
||||
)
|
||||
)
|
||||
.execute().actionGet();
|
||||
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
|
||||
|
||||
Filter filter =response.getAggregations().get("filtered");
|
||||
|
||||
GeoHashGrid geoGrid = filter.getAggregations().get("geohashgrid");
|
||||
for (GeoHashGrid.Bucket cell : geoGrid ){
|
||||
|
||||
Filter filter = response.getAggregations().get("filtered");
|
||||
|
||||
GeoHashGrid geoGrid = filter.getAggregations().get("geohashgrid");
|
||||
for (GeoHashGrid.Bucket cell : geoGrid) {
|
||||
String geohash = cell.getGeoHash();
|
||||
long bucketCount=cell.getDocCount();
|
||||
int expectedBucketCount=expectedDocCountsForGeoHash.get(geohash);
|
||||
long bucketCount = cell.getDocCount();
|
||||
int expectedBucketCount = expectedDocCountsForGeoHash.get(geohash);
|
||||
assertNotSame(bucketCount, 0);
|
||||
assertTrue("Buckets must be filtered", geohash.startsWith(smallestGeoHash));
|
||||
assertEquals("Geohash "+geohash+" has wrong doc count ",
|
||||
expectedBucketCount,bucketCount);
|
||||
|
||||
assertEquals("Geohash " + geohash + " has wrong doc count ",
|
||||
expectedBucketCount, bucketCount);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void unmapped() throws Exception {
|
||||
client().admin().cluster().prepareHealth("idx_unmapped").setWaitForYellowStatus().execute().actionGet();
|
||||
|
||||
|
||||
|
||||
|
||||
for (int precision = 1; precision <= highestPrecisionGeohash; precision++) {
|
||||
SearchResponse response = client().prepareSearch("idx_unmapped")
|
||||
.addAggregation(geohashGrid("geohashgrid")
|
||||
.field("location")
|
||||
.precision(precision)
|
||||
)
|
||||
.execute().actionGet();
|
||||
)
|
||||
.execute().actionGet();
|
||||
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
|
||||
GeoHashGrid geoGrid = response.getAggregations().get("geohashgrid");
|
||||
assertThat(geoGrid.getNumberOfBuckets(), equalTo(0));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void partiallyUnmapped() throws Exception {
|
||||
for (int precision = 1; precision <= highestPrecisionGeohash; precision++) {
|
||||
SearchResponse response = client().prepareSearch("idx","idx_unmapped")
|
||||
SearchResponse response = client().prepareSearch("idx", "idx_unmapped")
|
||||
.addAggregation(geohashGrid("geohashgrid")
|
||||
.field("location")
|
||||
.precision(precision)
|
||||
)
|
||||
.execute().actionGet();
|
||||
)
|
||||
.execute().actionGet();
|
||||
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
|
||||
GeoHashGrid geoGrid = response.getAggregations().get("geohashgrid");
|
||||
for (GeoHashGrid.Bucket cell : geoGrid ){
|
||||
String geohash=cell.getGeoHash();
|
||||
for (GeoHashGrid.Bucket cell : geoGrid) {
|
||||
String geohash = cell.getGeoHash();
|
||||
|
||||
long bucketCount=cell.getDocCount();
|
||||
int expectedBucketCount=expectedDocCountsForGeoHash.get(geohash);
|
||||
long bucketCount = cell.getDocCount();
|
||||
int expectedBucketCount = expectedDocCountsForGeoHash.get(geohash);
|
||||
assertNotSame(bucketCount, 0);
|
||||
assertEquals("Geohash "+geohash+" has wrong doc count ",
|
||||
expectedBucketCount,bucketCount);
|
||||
assertEquals("Geohash " + geohash + " has wrong doc count ",
|
||||
expectedBucketCount, bucketCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTopMatch() throws Exception {
|
||||
for (int precision = 1; precision <= highestPrecisionGeohash; precision++) {
|
||||
|
@ -218,29 +219,28 @@ public class GeoHashGridTests extends ElasticsearchIntegrationTest {
|
|||
.size(1)
|
||||
.shardSize(100)
|
||||
.precision(precision)
|
||||
)
|
||||
.execute().actionGet();
|
||||
)
|
||||
.execute().actionGet();
|
||||
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
|
||||
GeoHashGrid geoGrid = response.getAggregations().get("geohashgrid");
|
||||
//Check we only have one bucket with the best match for that resolution
|
||||
assertThat(geoGrid.getNumberOfBuckets(), equalTo(1));
|
||||
for (GeoHashGrid.Bucket cell : geoGrid ){
|
||||
String geohash=cell.getGeoHash();
|
||||
long bucketCount=cell.getDocCount();
|
||||
int expectedBucketCount=0;
|
||||
for (GeoHashGrid.Bucket cell : geoGrid) {
|
||||
String geohash = cell.getGeoHash();
|
||||
long bucketCount = cell.getDocCount();
|
||||
int expectedBucketCount = 0;
|
||||
for (ObjectIntCursor<String> cursor : expectedDocCountsForGeoHash) {
|
||||
if(cursor.key.length()==precision)
|
||||
{
|
||||
expectedBucketCount=Math.max(expectedBucketCount, cursor.value);
|
||||
if (cursor.key.length() == precision) {
|
||||
expectedBucketCount = Math.max(expectedBucketCount, cursor.value);
|
||||
}
|
||||
}
|
||||
assertNotSame(bucketCount, 0);
|
||||
assertEquals("Geohash "+geohash+" has wrong doc count ",
|
||||
expectedBucketCount,bucketCount);
|
||||
assertEquals("Geohash " + geohash + " has wrong doc count ",
|
||||
expectedBucketCount, bucketCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,325 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.search.aggregations.bucket;
|
||||
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.common.geo.GeoHashUtils;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.query.FilterBuilders;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
|
||||
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGrid;
|
||||
import org.elasticsearch.search.aggregations.bucket.global.Global;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogram;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||
import org.elasticsearch.search.aggregations.bucket.missing.Missing;
|
||||
import org.elasticsearch.search.aggregations.bucket.nested.Nested;
|
||||
import org.elasticsearch.search.aggregations.bucket.range.Range;
|
||||
import org.elasticsearch.search.aggregations.bucket.range.date.DateRange;
|
||||
import org.elasticsearch.search.aggregations.bucket.range.ipv4.IPv4Range;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.*;
|
||||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
||||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
/**
|
||||
* Tests making sure that the reduce is propagated to all aggregations in the hierarchy when executing on a single shard
|
||||
* These tests are based on the date histogram in combination of min_doc_count=0. In order for the date histogram to
|
||||
* compute empty buckets, its {@code reduce()} method must be called. So by adding the date histogram under other buckets,
|
||||
* we can make sure that the reduce is properly propagated by checking that empty buckets were created.
|
||||
*/
|
||||
public class ShardReduceTests extends ElasticsearchIntegrationTest {
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return ImmutableSettings.builder()
|
||||
.put("index.number_of_shards", randomBoolean() ? 1 : randomIntBetween(2, 10))
|
||||
.put("index.number_of_replicas", randomIntBetween(0, 1))
|
||||
.build();
|
||||
}
|
||||
|
||||
private IndexRequestBuilder indexDoc(String date, int value) throws Exception {
|
||||
return client().prepareIndex("idx", "type").setSource(jsonBuilder()
|
||||
.startObject()
|
||||
.field("value", value)
|
||||
.field("ip", "10.0.0." + value)
|
||||
.field("location", GeoHashUtils.encode(52, 5, 12))
|
||||
.field("date", date)
|
||||
.field("term-l", 1)
|
||||
.field("term-d", 1.5)
|
||||
.field("term-s", "term")
|
||||
.startObject("nested")
|
||||
.field("date", date)
|
||||
.endObject()
|
||||
.endObject());
|
||||
}
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
prepareCreate("idx")
|
||||
.addMapping("type", "nested", "type=nested", "ip", "type=ip", "location", "type=geo_point")
|
||||
.setSettings(indexSettings())
|
||||
.execute().actionGet();
|
||||
|
||||
indexRandom(true,
|
||||
indexDoc("2014-01-01", 1),
|
||||
indexDoc("2014-01-02", 2),
|
||||
indexDoc("2014-01-04", 3));
|
||||
ensureSearchable();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGlobal() throws Exception {
|
||||
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.setQuery(QueryBuilders.matchAllQuery())
|
||||
.addAggregation(global("global")
|
||||
.subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0)))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Global global = response.getAggregations().get("global");
|
||||
DateHistogram histo = global.getAggregations().get("histo");
|
||||
assertThat(histo.buckets().size(), equalTo(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilter() throws Exception {
|
||||
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.setQuery(QueryBuilders.matchAllQuery())
|
||||
.addAggregation(filter("filter").filter(FilterBuilders.matchAllFilter())
|
||||
.subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0)))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Filter filter = response.getAggregations().get("filter");
|
||||
DateHistogram histo = filter.getAggregations().get("histo");
|
||||
assertThat(histo.buckets().size(), equalTo(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissing() throws Exception {
|
||||
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.setQuery(QueryBuilders.matchAllQuery())
|
||||
.addAggregation(missing("missing").field("foobar")
|
||||
.subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0)))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Missing missing = response.getAggregations().get("missing");
|
||||
DateHistogram histo = missing.getAggregations().get("histo");
|
||||
assertThat(histo.buckets().size(), equalTo(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGlobalWithFilterWithMissing() throws Exception {
|
||||
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.setQuery(QueryBuilders.matchAllQuery())
|
||||
.addAggregation(global("global")
|
||||
.subAggregation(filter("filter").filter(FilterBuilders.matchAllFilter())
|
||||
.subAggregation(missing("missing").field("foobar")
|
||||
.subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0)))))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Global global = response.getAggregations().get("global");
|
||||
Filter filter = global.getAggregations().get("filter");
|
||||
Missing missing = filter.getAggregations().get("missing");
|
||||
DateHistogram histo = missing.getAggregations().get("histo");
|
||||
assertThat(histo.buckets().size(), equalTo(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNested() throws Exception {
|
||||
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.setQuery(QueryBuilders.matchAllQuery())
|
||||
.addAggregation(nested("nested").path("nested")
|
||||
.subAggregation(dateHistogram("histo").field("nested.date").interval(DateHistogram.Interval.DAY).minDocCount(0)))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Nested nested = response.getAggregations().get("nested");
|
||||
DateHistogram histo = nested.getAggregations().get("histo");
|
||||
assertThat(histo.buckets().size(), equalTo(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringTerms() throws Exception {
|
||||
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.setQuery(QueryBuilders.matchAllQuery())
|
||||
.addAggregation(terms("terms").field("term-s")
|
||||
.subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0)))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Terms terms = response.getAggregations().get("terms");
|
||||
DateHistogram histo = terms.getByTerm("term").getAggregations().get("histo");
|
||||
assertThat(histo.buckets().size(), equalTo(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLongTerms() throws Exception {
|
||||
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.setQuery(QueryBuilders.matchAllQuery())
|
||||
.addAggregation(terms("terms").field("term-l")
|
||||
.subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0)))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Terms terms = response.getAggregations().get("terms");
|
||||
DateHistogram histo = terms.getByTerm("1").getAggregations().get("histo");
|
||||
assertThat(histo.buckets().size(), equalTo(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDoubleTerms() throws Exception {
|
||||
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.setQuery(QueryBuilders.matchAllQuery())
|
||||
.addAggregation(terms("terms").field("term-d")
|
||||
.subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0)))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Terms terms = response.getAggregations().get("terms");
|
||||
DateHistogram histo = terms.getByTerm("1.5").getAggregations().get("histo");
|
||||
assertThat(histo.buckets().size(), equalTo(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRange() throws Exception {
|
||||
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.setQuery(QueryBuilders.matchAllQuery())
|
||||
.addAggregation(range("range").field("value").addRange("r1", 0, 10)
|
||||
.subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0)))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Range range = response.getAggregations().get("range");
|
||||
DateHistogram histo = range.getByKey("r1").getAggregations().get("histo");
|
||||
assertThat(histo.buckets().size(), equalTo(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDateRange() throws Exception {
|
||||
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.setQuery(QueryBuilders.matchAllQuery())
|
||||
.addAggregation(dateRange("range").field("date").addRange("r1", "2014-01-01", "2014-01-10")
|
||||
.subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0)))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
DateRange range = response.getAggregations().get("range");
|
||||
DateHistogram histo = range.getByKey("r1").getAggregations().get("histo");
|
||||
assertThat(histo.buckets().size(), equalTo(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIpRange() throws Exception {
|
||||
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.setQuery(QueryBuilders.matchAllQuery())
|
||||
.addAggregation(ipRange("range").field("ip").addRange("r1", "10.0.0.1", "10.0.0.10")
|
||||
.subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0)))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
IPv4Range range = response.getAggregations().get("range");
|
||||
DateHistogram histo = range.getByKey("r1").getAggregations().get("histo");
|
||||
assertThat(histo.buckets().size(), equalTo(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHistogram() throws Exception {
|
||||
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.setQuery(QueryBuilders.matchAllQuery())
|
||||
.addAggregation(histogram("topHisto").field("value").interval(5)
|
||||
.subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0)))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Histogram topHisto = response.getAggregations().get("topHisto");
|
||||
DateHistogram histo = topHisto.getByKey(0).getAggregations().get("histo");
|
||||
assertThat(histo.buckets().size(), equalTo(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDateHistogram() throws Exception {
|
||||
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.setQuery(QueryBuilders.matchAllQuery())
|
||||
.addAggregation(dateHistogram("topHisto").field("date").interval(DateHistogram.Interval.MONTH)
|
||||
.subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0)))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
DateHistogram topHisto = response.getAggregations().get("topHisto");
|
||||
DateHistogram histo = topHisto.iterator().next().getAggregations().get("histo");
|
||||
assertThat(histo.buckets().size(), equalTo(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGeoHashGrid() throws Exception {
|
||||
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.setQuery(QueryBuilders.matchAllQuery())
|
||||
.addAggregation(geohashGrid("grid").field("location")
|
||||
.subAggregation(dateHistogram("histo").field("date").interval(DateHistogram.Interval.DAY).minDocCount(0)))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
GeoHashGrid grid = response.getAggregations().get("grid");
|
||||
DateHistogram histo = grid.iterator().next().getAggregations().get("histo");
|
||||
assertThat(histo.buckets().size(), equalTo(4));
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
Reference in New Issue