mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
There was a bug in the geoshape circuit-breaker check where the hash values array was being allocated before its new size was accounted for by the circuit breaker. Fixes #57847.
This commit is contained in:
parent
48df9b1a0e
commit
c9ab7bb651
@ -22,6 +22,7 @@ package org.elasticsearch.index.fielddata;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.function.LongConsumer;
|
||||
|
||||
/**
|
||||
* Base implementation that throws an {@link IOException} for the
|
||||
@ -31,6 +32,14 @@ import java.io.IOException;
|
||||
*/
|
||||
public abstract class AbstractSortingNumericDocValues extends SortingNumericDocValues {
|
||||
|
||||
public AbstractSortingNumericDocValues() {
|
||||
super();
|
||||
}
|
||||
|
||||
public AbstractSortingNumericDocValues(LongConsumer circuitBreakerConsumer) {
|
||||
super(circuitBreakerConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int docID() {
|
||||
throw new UnsupportedOperationException();
|
||||
|
@ -24,6 +24,8 @@ import org.apache.lucene.util.ArrayUtil;
|
||||
import org.apache.lucene.util.InPlaceMergeSorter;
|
||||
import org.apache.lucene.util.Sorter;
|
||||
|
||||
import java.util.function.LongConsumer;
|
||||
|
||||
/**
|
||||
* Base class for building {@link SortedNumericDocValues} instances based on unsorted content.
|
||||
*/
|
||||
@ -33,8 +35,13 @@ public abstract class SortingNumericDocValues extends SortedNumericDocValues {
|
||||
protected long[] values;
|
||||
protected int valuesCursor;
|
||||
private final Sorter sorter;
|
||||
private LongConsumer circuitBreakerConsumer;
|
||||
|
||||
protected SortingNumericDocValues() {
|
||||
this(l -> {});
|
||||
}
|
||||
|
||||
protected SortingNumericDocValues(LongConsumer circuitBreakerConsumer) {
|
||||
values = new long[1];
|
||||
valuesCursor = 0;
|
||||
sorter = new InPlaceMergeSorter() {
|
||||
@ -51,6 +58,9 @@ public abstract class SortingNumericDocValues extends SortedNumericDocValues {
|
||||
return Long.compare(values[i], values[j]);
|
||||
}
|
||||
};
|
||||
this.circuitBreakerConsumer = circuitBreakerConsumer;
|
||||
// account for initial values size of 1
|
||||
this.circuitBreakerConsumer.accept(Long.BYTES);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -59,8 +69,25 @@ public abstract class SortingNumericDocValues extends SortedNumericDocValues {
|
||||
*/
|
||||
protected final void resize(int newSize) {
|
||||
count = newSize;
|
||||
values = ArrayUtil.grow(values, count);
|
||||
valuesCursor = 0;
|
||||
|
||||
if (newSize <= values.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Array is expected to grow so increment the circuit breaker
|
||||
// to include both the additional bytes used by the grown array
|
||||
// as well as the overhead of keeping both arrays in memory while
|
||||
// copying.
|
||||
long oldValuesSizeInBytes = values.length * Long.BYTES;
|
||||
int newValuesLength = ArrayUtil.oversize(newSize, Long.BYTES);
|
||||
circuitBreakerConsumer.accept(newValuesLength * Long.BYTES);
|
||||
|
||||
// resize
|
||||
values = ArrayUtil.growExact(values, newValuesLength);
|
||||
|
||||
// account for freeing the old values array
|
||||
circuitBreakerConsumer.accept(-oldValuesSizeInBytes);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -9,17 +9,17 @@ package org.elasticsearch.xpack.spatial.search.aggregations.bucket.geogrid;
|
||||
import org.elasticsearch.xpack.spatial.index.fielddata.MultiGeoShapeValues;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.LongConsumer;
|
||||
|
||||
/** Sorted numeric doc values for precision 0 */
|
||||
class AllCellValues extends ByteTrackingSortingNumericDocValues {
|
||||
private MultiGeoShapeValues geoValues;
|
||||
|
||||
protected AllCellValues(MultiGeoShapeValues geoValues, GeoGridTiler tiler, Consumer<Long> circuitBreakerConsumer) {
|
||||
protected AllCellValues(MultiGeoShapeValues geoValues, GeoGridTiler tiler, LongConsumer circuitBreakerConsumer) {
|
||||
super(circuitBreakerConsumer);
|
||||
this.geoValues = geoValues;
|
||||
resize(1);
|
||||
values[0] = tiler.encode(0, 0, 0);
|
||||
circuitBreakerConsumer.accept((long) Long.BYTES);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -8,9 +8,18 @@ package org.elasticsearch.xpack.spatial.search.aggregations.bucket.geogrid;
|
||||
|
||||
import org.elasticsearch.index.fielddata.AbstractSortingNumericDocValues;
|
||||
|
||||
import java.util.function.LongConsumer;
|
||||
|
||||
/**
|
||||
* Wrapper class for GeoGrid to expose the protected values array for testing
|
||||
*/
|
||||
abstract class ByteTrackingSortingNumericDocValues extends AbstractSortingNumericDocValues {
|
||||
|
||||
public long getValuesBytes() {
|
||||
ByteTrackingSortingNumericDocValues(LongConsumer circuitBreakerConsumer) {
|
||||
super(circuitBreakerConsumer);
|
||||
}
|
||||
|
||||
long getValuesBytes() {
|
||||
return values.length * Long.BYTES;
|
||||
}
|
||||
}
|
||||
|
@ -16,13 +16,13 @@ import org.elasticsearch.xpack.spatial.index.fielddata.MultiGeoShapeValues;
|
||||
import org.elasticsearch.xpack.spatial.search.aggregations.support.GeoShapeValuesSource;
|
||||
import org.elasticsearch.xpack.spatial.search.aggregations.support.GeoShapeValuesSourceType;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.LongConsumer;
|
||||
|
||||
public class GeoShapeCellIdSource extends ValuesSource.Numeric {
|
||||
private final GeoShapeValuesSource valuesSource;
|
||||
private final int precision;
|
||||
private final GeoGridTiler encoder;
|
||||
private Consumer<Long> circuitBreakerConsumer;
|
||||
private LongConsumer circuitBreakerConsumer;
|
||||
|
||||
public GeoShapeCellIdSource(GeoShapeValuesSource valuesSource, int precision, GeoGridTiler encoder) {
|
||||
this.valuesSource = valuesSource;
|
||||
@ -36,7 +36,7 @@ public class GeoShapeCellIdSource extends ValuesSource.Numeric {
|
||||
* accessible from within the values-source. Problem is that this values-source needs to
|
||||
* be created and passed to the aggregator before we have access to this functionality.
|
||||
*/
|
||||
public void setCircuitBreakerConsumer(Consumer<Long> circuitBreakerConsumer) {
|
||||
public void setCircuitBreakerConsumer(LongConsumer circuitBreakerConsumer) {
|
||||
this.circuitBreakerConsumer = circuitBreakerConsumer;
|
||||
}
|
||||
|
||||
|
@ -9,22 +9,20 @@ package org.elasticsearch.xpack.spatial.search.aggregations.bucket.geogrid;
|
||||
import org.elasticsearch.xpack.spatial.index.fielddata.MultiGeoShapeValues;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.LongConsumer;
|
||||
|
||||
/** Sorted numeric doc values for geo shapes */
|
||||
class GeoShapeCellValues extends ByteTrackingSortingNumericDocValues {
|
||||
private final MultiGeoShapeValues geoShapeValues;
|
||||
private final Consumer<Long> circuitBreakerConsumer;
|
||||
protected int precision;
|
||||
protected GeoGridTiler tiler;
|
||||
|
||||
protected GeoShapeCellValues(MultiGeoShapeValues geoShapeValues, int precision, GeoGridTiler tiler,
|
||||
Consumer<Long> circuitBreakerConsumer) {
|
||||
LongConsumer circuitBreakerConsumer) {
|
||||
super(circuitBreakerConsumer);
|
||||
this.geoShapeValues = geoShapeValues;
|
||||
this.precision = precision;
|
||||
this.tiler = tiler;
|
||||
this.circuitBreakerConsumer = circuitBreakerConsumer;
|
||||
circuitBreakerConsumer.accept((long) Long.BYTES);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -45,18 +43,13 @@ class GeoShapeCellValues extends ByteTrackingSortingNumericDocValues {
|
||||
return values;
|
||||
}
|
||||
|
||||
protected void add(int idx, long value) {
|
||||
values[idx] = value;
|
||||
void resizeCell(int newSize) {
|
||||
resize(newSize);
|
||||
}
|
||||
|
||||
void resizeCell(int newSize) {
|
||||
int oldValuesLength = values.length;
|
||||
resize(newSize);
|
||||
int newValuesLength = values.length;
|
||||
if (newValuesLength > oldValuesLength) {
|
||||
long bytesDiff = (newValuesLength - oldValuesLength) * Long.BYTES;
|
||||
circuitBreakerConsumer.accept(bytesDiff);
|
||||
}
|
||||
|
||||
protected void add(int idx, long value) {
|
||||
values[idx] = value;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -16,7 +16,6 @@ import org.elasticsearch.geo.GeometryTestUtils;
|
||||
import org.elasticsearch.geometry.Geometry;
|
||||
import org.elasticsearch.geometry.LinearRing;
|
||||
import org.elasticsearch.geometry.MultiLine;
|
||||
import org.elasticsearch.geometry.MultiPoint;
|
||||
import org.elasticsearch.geometry.MultiPolygon;
|
||||
import org.elasticsearch.geometry.Point;
|
||||
import org.elasticsearch.geometry.Polygon;
|
||||
@ -34,9 +33,11 @@ import org.elasticsearch.xpack.spatial.index.fielddata.MultiGeoShapeValues;
|
||||
import org.elasticsearch.xpack.spatial.index.fielddata.TriangleTreeReader;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.List;
|
||||
import java.util.function.LongConsumer;
|
||||
|
||||
import static org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils.LATITUDE_MASK;
|
||||
import static org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils.NORMALIZED_LATITUDE_MASK;
|
||||
@ -50,7 +51,7 @@ import static org.hamcrest.Matchers.equalTo;
|
||||
public class GeoGridTilerTests extends ESTestCase {
|
||||
private static final GeoTileGridTiler GEOTILE = new GeoTileGridTiler();
|
||||
private static final GeoHashGridTiler GEOHASH = new GeoHashGridTiler();
|
||||
private static final Consumer<Long> NOOP_BREAKER = (l) -> {};
|
||||
private static final LongConsumer NOOP_BREAKER = (l) -> {};
|
||||
|
||||
public void testGeoTile() throws Exception {
|
||||
double x = randomDouble();
|
||||
@ -464,33 +465,40 @@ public class GeoGridTilerTests extends ESTestCase {
|
||||
}
|
||||
|
||||
private void testCircuitBreaker(GeoGridTiler tiler) throws IOException {
|
||||
MultiPoint multiPoint = GeometryTestUtils.randomMultiPoint(false);
|
||||
int precision = randomIntBetween(0, 6);
|
||||
TriangleTreeReader reader = triangleTreeReader(multiPoint, GeoShapeCoordinateEncoder.INSTANCE);
|
||||
Polygon polygon = GeometryTestUtils.randomPolygon(false);
|
||||
int precision = randomIntBetween(0, 5);
|
||||
TriangleTreeReader reader = triangleTreeReader(polygon, GeoShapeCoordinateEncoder.INSTANCE);
|
||||
MultiGeoShapeValues.GeoShapeValue value = new MultiGeoShapeValues.GeoShapeValue(reader);
|
||||
|
||||
final long numBytes;
|
||||
List<Long> byteChangeHistory = new ArrayList<>();
|
||||
if (precision == 0) {
|
||||
AllCellValues values = new AllCellValues(null, tiler, NOOP_BREAKER);
|
||||
numBytes = values.getValuesBytes();
|
||||
new AllCellValues(null, tiler, byteChangeHistory::add);
|
||||
} else {
|
||||
GeoShapeCellValues values = new GeoShapeCellValues(null, precision, tiler, NOOP_BREAKER);
|
||||
GeoShapeCellValues values = new GeoShapeCellValues(null, precision, tiler, byteChangeHistory::add);
|
||||
tiler.setValues(values, value, precision);
|
||||
numBytes = values.getValuesBytes();
|
||||
}
|
||||
|
||||
final long maxNumBytes;
|
||||
final long curNumBytes;
|
||||
if (byteChangeHistory.size() == 1) {
|
||||
curNumBytes = maxNumBytes = byteChangeHistory.get(byteChangeHistory.size() - 1);
|
||||
} else {
|
||||
long oldNumBytes = -byteChangeHistory.get(byteChangeHistory.size() - 1);
|
||||
curNumBytes = byteChangeHistory.get(byteChangeHistory.size() - 2);
|
||||
maxNumBytes = oldNumBytes + curNumBytes;
|
||||
}
|
||||
|
||||
CircuitBreakerService service = new HierarchyCircuitBreakerService(Settings.EMPTY,
|
||||
Collections.singletonList(new BreakerSettings("limited", numBytes - 1, 1.0)),
|
||||
Collections.singletonList(new BreakerSettings("limited", maxNumBytes - 1, 1.0)),
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||
CircuitBreaker limitedBreaker = service.getBreaker("limited");
|
||||
|
||||
Consumer<Long> circuitBreakerConsumer = (l) -> limitedBreaker.addEstimateBytesAndMaybeBreak(l, "agg");
|
||||
LongConsumer circuitBreakerConsumer = (l) -> limitedBreaker.addEstimateBytesAndMaybeBreak(l, "agg");
|
||||
expectThrows(CircuitBreakingException.class, () -> {
|
||||
GeoShapeCellValues values = new GeoShapeCellValues(null, precision, tiler, circuitBreakerConsumer);
|
||||
tiler.setValues(values, value, precision);
|
||||
assertThat(values.getValuesBytes(), equalTo(numBytes));
|
||||
assertThat(limitedBreaker.getUsed(), equalTo(numBytes));
|
||||
assertThat(values.getValuesBytes(), equalTo(curNumBytes));
|
||||
assertThat(limitedBreaker.getUsed(), equalTo(curNumBytes));
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -80,7 +80,7 @@
|
||||
indices.refresh: {}
|
||||
|
||||
- do:
|
||||
catch: /data for \[<agg \[grid\]>\] would be \[28648\/27.9kb\], which is larger than the limit of \[25600\/25kb\]/
|
||||
catch: /data for \[<agg \[grid\]>\] would be \[42760\/41.7kb\], which is larger than the limit of \[25600\/25kb\]/
|
||||
search:
|
||||
rest_total_hits_as_int: true
|
||||
index: locations
|
||||
@ -93,7 +93,7 @@
|
||||
field: location
|
||||
|
||||
- do:
|
||||
catch: /data for \[<agg \[grid\]>\] would be \[28648\/27.9kb\], which is larger than the limit of \[25600\/25kb\]/
|
||||
catch: /data for \[<agg \[grid\]>\] would be \[42760\/41.7kb\], which is larger than the limit of \[25600\/25kb\]/
|
||||
search:
|
||||
rest_total_hits_as_int: true
|
||||
index: locations
|
||||
|
@ -82,7 +82,7 @@
|
||||
indices.refresh: {}
|
||||
|
||||
- do:
|
||||
catch: /data for \[<agg \[grid\]>\] would be \[26344\/25.7kb\], which is larger than the limit of \[25600\/25kb\]/
|
||||
catch: /data for \[<agg \[grid\]>\] would be \[30160\/29.4kb\], which is larger than the limit of \[25600\/25kb\]/
|
||||
search:
|
||||
rest_total_hits_as_int: true
|
||||
index: locations
|
||||
@ -95,7 +95,7 @@
|
||||
field: location
|
||||
|
||||
- do:
|
||||
catch: /data for \[<agg \[grid\]>\] would be \[26344\/25.7kb\], which is larger than the limit of \[25600\/25kb\]/
|
||||
catch: /data for \[<agg \[grid\]>\] would be \[30160\/29.4kb\], which is larger than the limit of \[25600\/25kb\]/
|
||||
search:
|
||||
rest_total_hits_as_int: true
|
||||
index: locations
|
||||
|
Loading…
x
Reference in New Issue
Block a user