Same memory when geo aggregations are not on top (#57483) (#57551)

Saves memory when the `geotile_grid` and `geohash_grid` are not on the
top level by using the `LongKeyedBucketOrds` we built in #55873.
This commit is contained in:
Nik Everett 2020-06-02 16:21:50 -04:00 committed by GitHub
parent 97a51272b0
commit 2a27c411fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 144 additions and 76 deletions

View File

@ -22,13 +22,13 @@ import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
@ -46,17 +46,16 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
protected final int requiredSize;
protected final int shardSize;
protected final ValuesSource.Numeric valuesSource;
protected final LongHash bucketOrds;
protected SortedNumericDocValues values;
protected final LongKeyedBucketOrds bucketOrds;
GeoGridAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource,
int requiredSize, int shardSize, SearchContext aggregationContext,
Aggregator parent, Map<String, Object> metadata) throws IOException {
Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
super(name, factories, aggregationContext, parent, metadata);
this.valuesSource = valuesSource;
this.requiredSize = requiredSize;
this.shardSize = shardSize;
bucketOrds = new LongHash(1, aggregationContext.bigArrays());
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
}
@Override
@ -70,11 +69,10 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
values = valuesSource.longValues(ctx);
SortedNumericDocValues values = valuesSource.longValues(ctx);
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int doc, long bucket) throws IOException {
assert bucket == 0;
public void collect(int doc, long owningBucketOrd) throws IOException {
if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
@ -82,7 +80,7 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
for (int i = 0; i < valuesCount; ++i) {
final long val = values.nextValue();
if (previous != val || i == 0) {
long bucketOrdinal = bucketOrds.add(val);
long bucketOrdinal = bucketOrds.add(owningBucketOrd, val);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
@ -108,31 +106,38 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
final int size = (int) Math.min(bucketOrds.size(), shardSize);
consumeBucketsAndMaybeBreak(size);
BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
InternalGeoGridBucket spare = null;
for (long i = 0; i < bucketOrds.size(); i++) {
if (spare == null) {
spare = newEmptyBucket();
InternalGeoGridBucket[][] topBucketsPerOrd = new InternalGeoGridBucket[owningBucketOrds.length][];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]), shardSize);
consumeBucketsAndMaybeBreak(size);
BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
InternalGeoGridBucket spare = null;
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
if (spare == null) {
spare = newEmptyBucket();
}
// need a special function to keep the source bucket
// up-to-date so it can get the appropriate key
spare.hashAsLong = ordsEnum.value();
spare.docCount = bucketDocCount(ordsEnum.ord());
spare.bucketOrd = ordsEnum.ord();
spare = ordered.insertWithOverflow(spare);
}
topBucketsPerOrd[ordIdx] = new InternalGeoGridBucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; --i) {
topBucketsPerOrd[ordIdx][i] = ordered.pop();
}
// need a special function to keep the source bucket
// up-to-date so it can get the appropriate key
spare.hashAsLong = bucketOrds.get(i);
spare.docCount = bucketDocCount(i);
spare.bucketOrd = i;
spare = ordered.insertWithOverflow(spare);
}
final InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; --i) {
list[i] = ordered.pop();
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
results[ordIdx] = buildAggregation(name, requiredSize, Arrays.asList(topBucketsPerOrd[ordIdx]), metadata());
}
buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
return new InternalAggregation[] {buildAggregation(name, requiredSize, Arrays.asList(list), metadata())};
return results;
}
@Override

View File

@ -35,8 +35,8 @@ public class GeoHashGridAggregator extends GeoGridAggregator<InternalGeoHashGrid
public GeoHashGridAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource,
int requiredSize, int shardSize, SearchContext aggregationContext,
Aggregator parent, Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, metadata);
Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, collectsFromSingleBucket, metadata);
}
@Override

View File

@ -85,21 +85,18 @@ public class GeoHashGridAggregatorFactory extends ValuesSourceAggregatorFactory
throw new AggregationExecutionException("Registry miss-match - expected "
+ GeoGridAggregatorSupplier.class.getName() + ", found [" + aggregatorSupplier.getClass().toString() + "]");
}
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, searchContext, parent);
}
return ((GeoGridAggregatorSupplier) aggregatorSupplier).build(name, factories, valuesSource, precision, geoBoundingBox,
requiredSize, shardSize, searchContext, parent, metadata);
requiredSize, shardSize, searchContext, parent, collectsFromSingleBucket, metadata);
}
static void registerAggregators(ValuesSourceRegistry.Builder builder) {
builder.register(GeoHashGridAggregationBuilder.NAME, CoreValuesSourceType.GEOPOINT,
(GeoGridAggregatorSupplier) (name, factories, valuesSource, precision, geoBoundingBox, requiredSize, shardSize,
aggregationContext, parent, metadata) -> {
aggregationContext, parent, collectsFromSingleBucket, metadata) -> {
CellIdSource cellIdSource = new CellIdSource((ValuesSource.GeoPoint) valuesSource, precision, geoBoundingBox,
Geohash::longEncode);
return new GeoHashGridAggregator(name, factories, cellIdSource, requiredSize, shardSize, aggregationContext,
parent, metadata);
parent, collectsFromSingleBucket, metadata);
});
}
}

View File

@ -36,8 +36,8 @@ public class GeoTileGridAggregator extends GeoGridAggregator<InternalGeoTileGrid
public GeoTileGridAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource,
int requiredSize, int shardSize, SearchContext aggregationContext,
Aggregator parent, Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, metadata);
Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, collectsFromSingleBucket, metadata);
}
@Override

View File

@ -83,21 +83,18 @@ public class GeoTileGridAggregatorFactory extends ValuesSourceAggregatorFactory
throw new AggregationExecutionException("Registry miss-match - expected "
+ GeoGridAggregatorSupplier.class.getName() + ", found [" + aggregatorSupplier.getClass().toString() + "]");
}
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, searchContext, parent);
}
return ((GeoGridAggregatorSupplier) aggregatorSupplier).build(name, factories, valuesSource, precision, geoBoundingBox,
requiredSize, shardSize, searchContext, parent, metadata);
requiredSize, shardSize, searchContext, parent, collectsFromSingleBucket, metadata);
}
static void registerAggregators(ValuesSourceRegistry.Builder builder) {
builder.register(GeoTileGridAggregationBuilder.NAME, CoreValuesSourceType.GEOPOINT,
(GeoGridAggregatorSupplier) (name, factories, valuesSource, precision, geoBoundingBox, requiredSize, shardSize,
aggregationContext, parent, metadata) -> {
aggregationContext, parent, collectsFromSingleBucket, metadata) -> {
CellIdSource cellIdSource = new CellIdSource((ValuesSource.GeoPoint) valuesSource, precision, geoBoundingBox,
GeoTileUtils::longEncode);
return new GeoTileGridAggregator(name, factories, cellIdSource, requiredSize, shardSize, aggregationContext,
parent, metadata);
parent, collectsFromSingleBucket, metadata);
});
}
}

View File

@ -32,9 +32,17 @@ import java.util.Map;
@FunctionalInterface
public interface GeoGridAggregatorSupplier extends AggregatorSupplier {
GeoGridAggregator build(String name, AggregatorFactories factories, ValuesSource valuesSource,
int precision, GeoBoundingBox geoBoundingBox, int requiredSize, int shardSize,
SearchContext aggregationContext, Aggregator parent,
Map<String, Object> metadata) throws IOException;
GeoGridAggregator build(
String name,
AggregatorFactories factories,
ValuesSource valuesSource,
int precision,
GeoBoundingBox geoBoundingBox,
int requiredSize,
int shardSize,
SearchContext aggregationContext,
Aggregator parent,
boolean collectsFromSingleBucket,
Map<String, Object> metadata
) throws IOException;
}

View File

@ -19,14 +19,17 @@
package org.elasticsearch.search.aggregations.bucket.geogrid;
import org.apache.lucene.document.LatLonDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.geo.GeoEncodingUtils;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.geo.GeoBoundingBox;
import org.elasticsearch.common.geo.GeoBoundingBoxTests;
@ -35,6 +38,8 @@ import org.elasticsearch.index.mapper.GeoPointFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
import java.io.IOException;
@ -45,6 +50,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.function.Function;
@ -109,18 +115,9 @@ public abstract class GeoGridAggregatorTestCase<T extends InternalGeoGridBucket>
List<LatLonDocValuesField> points = new ArrayList<>();
Set<String> distinctHashesPerDoc = new HashSet<>();
for (int pointId = 0; pointId < numPoints; pointId++) {
double lat = (180d * randomDouble()) - 90d;
double lng = (360d * randomDouble()) - 180d;
// Precision-adjust longitude/latitude to avoid wrong bucket placement
// Internally, lat/lng get converted to 32 bit integers, loosing some precision.
// This does not affect geohashing because geohash uses the same algorithm,
// but it does affect other bucketing algos, thus we need to do the same steps here.
lng = GeoEncodingUtils.decodeLongitude(GeoEncodingUtils.encodeLongitude(lng));
lat = GeoEncodingUtils.decodeLatitude(GeoEncodingUtils.encodeLatitude(lat));
points.add(new LatLonDocValuesField(FIELD_NAME, lat, lng));
String hash = hashAsString(lng, lat, precision);
double[] latLng = randomLatLng();
points.add(new LatLonDocValuesField(FIELD_NAME, latLng[0], latLng[1]));
String hash = hashAsString(latLng[1], latLng[0], precision);
if (distinctHashesPerDoc.contains(hash) == false) {
expectedCountPerGeoHash.put(hash, expectedCountPerGeoHash.getOrDefault(hash, 0) + 1);
}
@ -137,6 +134,60 @@ public abstract class GeoGridAggregatorTestCase<T extends InternalGeoGridBucket>
});
}
public void testAsSubAgg() throws IOException {
int precision = randomPrecision();
Map<String, Map<String, Long>> expectedCountPerTPerGeoHash = new TreeMap<>();
List<List<IndexableField>> docs = new ArrayList<>();
for (int i = 0; i < 30; i++) {
String t = randomAlphaOfLength(1);
double[] latLng = randomLatLng();
List<IndexableField> doc = new ArrayList<>();
docs.add(doc);
doc.add(new LatLonDocValuesField(FIELD_NAME, latLng[0], latLng[1]));
doc.add(new SortedSetDocValuesField("t", new BytesRef(t)));
String hash = hashAsString(latLng[1], latLng[0], precision);
Map<String, Long> expectedCountPerGeoHash = expectedCountPerTPerGeoHash.get(t);
if (expectedCountPerGeoHash == null) {
expectedCountPerGeoHash = new TreeMap<>();
expectedCountPerTPerGeoHash.put(t, expectedCountPerGeoHash);
}
expectedCountPerGeoHash.put(hash, expectedCountPerGeoHash.getOrDefault(hash, 0L) + 1);
}
CheckedConsumer<RandomIndexWriter, IOException> buildIndex = iw -> iw.addDocuments(docs);
TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("t").field("t")
.size(expectedCountPerTPerGeoHash.size())
.subAggregation(createBuilder("gg").field(FIELD_NAME).precision(precision));
Consumer<StringTerms> verify = (terms) -> {
Map<String, Map<String, Long>> actual = new TreeMap<>();
for (StringTerms.Bucket tb: terms.getBuckets()) {
InternalGeoGrid<?> gg = tb.getAggregations().get("gg");
Map<String, Long> sub = new TreeMap<>();
for (InternalGeoGridBucket<?> ggb : gg.getBuckets()) {
sub.put(ggb.getKeyAsString(), ggb.getDocCount());
}
actual.put(tb.getKeyAsString(), sub);
}
assertThat(actual, equalTo(expectedCountPerTPerGeoHash));
};
testCase(aggregationBuilder, new MatchAllDocsQuery(), buildIndex, verify, keywordField("t"), geoPointField(FIELD_NAME));
}
private double[] randomLatLng() {
double lat = (180d * randomDouble()) - 90d;
double lng = (360d * randomDouble()) - 180d;
// Precision-adjust longitude/latitude to avoid wrong bucket placement
// Internally, lat/lng get converted to 32 bit integers, loosing some precision.
// This does not affect geohashing because geohash uses the same algorithm,
// but it does affect other bucketing algos, thus we need to do the same steps here.
lng = GeoEncodingUtils.decodeLongitude(GeoEncodingUtils.encodeLongitude(lng));
lat = GeoEncodingUtils.decodeLatitude(GeoEncodingUtils.encodeLatitude(lat));
return new double[] {lat, lng};
}
public void testBounds() throws IOException {
final int numDocs = randomIntBetween(64, 256);
final GeoGridAggregationBuilder builder = createBuilder("_name");

View File

@ -300,13 +300,11 @@ public class NumericHistogramAggregatorTests extends AggregatorTestCase {
HistogramAggregationBuilder aggBuilder = new HistogramAggregationBuilder("my_agg")
.field("field")
.interval(5);
MappedFieldType fieldType = keywordField("field");
fieldType.setHasDocValues(true);
try (IndexReader reader = w.getReader()) {
IndexSearcher searcher = new IndexSearcher(reader);
expectThrows(IllegalArgumentException.class, () -> {
search(searcher, new MatchAllDocsQuery(), aggBuilder, fieldType);
search(searcher, new MatchAllDocsQuery(), aggBuilder, keywordField("field"));
});
}
}

View File

@ -76,6 +76,7 @@ import org.elasticsearch.index.mapper.ContentPath;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.FieldAliasMapper;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.GeoPointFieldMapper;
import org.elasticsearch.index.mapper.GeoShapeFieldMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
@ -896,12 +897,23 @@ public abstract class AggregatorTestCase extends ESTestCase {
return result;
}
/**
* Make a {@linkplain GeoPointFieldMapper.GeoPointFieldType} for a {@code geo_point}.
*/
protected GeoPointFieldMapper.GeoPointFieldType geoPointField(String name) {
GeoPointFieldMapper.GeoPointFieldType result = new GeoPointFieldMapper.GeoPointFieldType();
result.setHasDocValues(true);
result.setName(name);
return result;
}
/**
* Make a {@linkplain DateFieldMapper.DateFieldType} for a {@code date}.
*/
protected KeywordFieldMapper.KeywordFieldType keywordField(String name) {
KeywordFieldMapper.KeywordFieldType result = new KeywordFieldMapper.KeywordFieldType();
result.setName(name);
result.setHasDocValues(true);
return result;
}

View File

@ -120,7 +120,7 @@ public class SpatialPlugin extends GeoPlugin implements MapperPlugin, SearchPlug
private void registerGeoShapeGridAggregators(ValuesSourceRegistry.Builder builder) {
builder.register(GeoHashGridAggregationBuilder.NAME, GeoShapeValuesSourceType.instance(),
(GeoGridAggregatorSupplier) (name, factories, valuesSource, precision, geoBoundingBox, requiredSize, shardSize,
aggregationContext, parent, metadata) -> {
aggregationContext, parent, collectsFromSingleBucket, metadata) -> {
if (getLicenseState().isAllowed(XPackLicenseState.Feature.SPATIAL_GEO_GRID)) {
final GeoGridTiler tiler;
if (geoBoundingBox.isUnbounded()) {
@ -130,7 +130,7 @@ public class SpatialPlugin extends GeoPlugin implements MapperPlugin, SearchPlug
}
GeoShapeCellIdSource cellIdSource = new GeoShapeCellIdSource((GeoShapeValuesSource) valuesSource, precision, tiler);
GeoShapeHashGridAggregator agg = new GeoShapeHashGridAggregator(name, factories, cellIdSource, requiredSize, shardSize,
aggregationContext, parent, metadata);
aggregationContext, parent, collectsFromSingleBucket, metadata);
// this would ideally be something set in an immutable way on the ValuesSource
cellIdSource.setCircuitBreakerConsumer(agg::addRequestBytes);
return agg;
@ -140,7 +140,7 @@ public class SpatialPlugin extends GeoPlugin implements MapperPlugin, SearchPlug
builder.register(GeoTileGridAggregationBuilder.NAME, GeoShapeValuesSourceType.instance(),
(GeoGridAggregatorSupplier) (name, factories, valuesSource, precision, geoBoundingBox, requiredSize, shardSize,
aggregationContext, parent, metadata) -> {
aggregationContext, parent, collectsFromSingleBucket, metadata) -> {
if (getLicenseState().isAllowed(XPackLicenseState.Feature.SPATIAL_GEO_GRID)) {
final GeoGridTiler tiler;
if (geoBoundingBox.isUnbounded()) {
@ -150,7 +150,7 @@ public class SpatialPlugin extends GeoPlugin implements MapperPlugin, SearchPlug
}
GeoShapeCellIdSource cellIdSource = new GeoShapeCellIdSource((GeoShapeValuesSource) valuesSource, precision, tiler);
GeoShapeTileGridAggregator agg = new GeoShapeTileGridAggregator(name, factories, cellIdSource, requiredSize, shardSize,
aggregationContext, parent, metadata);
aggregationContext, parent, collectsFromSingleBucket, metadata);
// this would ideally be something set in an immutable way on the ValuesSource
cellIdSource.setCircuitBreakerConsumer(agg::addRequestBytes);
return agg;

View File

@ -18,8 +18,8 @@ import java.util.Map;
public class GeoShapeHashGridAggregator extends GeoHashGridAggregator {
public GeoShapeHashGridAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, int requiredSize,
int shardSize, SearchContext aggregationContext, Aggregator parent,
Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, metadata);
boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, collectsFromSingleBucket, metadata);
}
/**

View File

@ -18,8 +18,8 @@ import java.util.Map;
public class GeoShapeTileGridAggregator extends GeoTileGridAggregator {
public GeoShapeTileGridAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, int requiredSize,
int shardSize, SearchContext aggregationContext, Aggregator parent,
Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, metadata);
boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, collectsFromSingleBucket, metadata);
}
/**

View File

@ -63,7 +63,7 @@ public class SpatialPluginTests extends ESTestCase {
License.OperationMode.compare(operationMode, License.OperationMode.GOLD) < 0) {
ElasticsearchSecurityException exception = expectThrows(ElasticsearchSecurityException.class,
() -> supplier.build(null, null, null, 0, null,
0,0, null, null, null));
0,0, null, null, false, null));
assertThat(exception.getMessage(),
equalTo("current license is non-compliant for [" + builderName + " aggregation on geo_shape fields]"));
}