Fix bug when initializing HyperLogLogPlusPlusSparse (#62602) (#62702)

This is a follow up of #62480 where we are oversizing one array when initialising. In addition it prevents a possible CircuitBreaker leak during initialisation.
This commit is contained in:
Ignacio Vera 2020-09-21 17:30:40 +02:00 committed by GitHub
parent 13e28b85ff
commit cadd5dc53f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 71 additions and 3 deletions

View File

@ -97,8 +97,20 @@ final class HyperLogLogPlusPlusSparse extends AbstractHyperLogLogPlusPlus implem
super(p); super(p);
this.bigArrays = bigArrays; this.bigArrays = bigArrays;
this.capacity = capacity; this.capacity = capacity;
values = bigArrays.newIntArray(initialSize * capacity); IntArray values = null;
sizes = bigArrays.newIntArray(initialSize * capacity); IntArray sizes = null;
boolean success = false;
try {
values = bigArrays.newIntArray(initialSize * capacity);
sizes = bigArrays.newIntArray(initialSize);
success = true;
} finally {
if (success == false) {
Releasables.close(values, sizes);
}
}
this.values = values;
this.sizes = sizes;
iterator = new LinearCountingIterator(this, capacity); iterator = new LinearCountingIterator(this, capacity);
} }
@ -140,7 +152,7 @@ final class HyperLogLogPlusPlusSparse extends AbstractHyperLogLogPlusPlus implem
int size = size(bucketOrd); int size = size(bucketOrd);
if (size == 0) { if (size == 0) {
sizes = bigArrays.grow(sizes, bucketOrd + 1); sizes = bigArrays.grow(sizes, bucketOrd + 1);
values = bigArrays.grow(values, (bucketOrd * capacity) + capacity); values = bigArrays.grow(values, (bucketOrd + 1) * capacity);
} }
values.set(index(bucketOrd, size), value); values.set(index(bucketOrd, size), value);
return sizes.increment(bucketOrd, 1); return sizes.increment(bucketOrd, 1);

View File

@ -20,17 +20,36 @@
package org.elasticsearch.search.aggregations.metrics; package org.elasticsearch.search.aggregations.metrics;
import com.carrotsearch.hppc.BitMixer; import com.carrotsearch.hppc.BitMixer;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.search.aggregations.metrics.AbstractHyperLogLog.MAX_PRECISION; import static org.elasticsearch.search.aggregations.metrics.AbstractHyperLogLog.MAX_PRECISION;
import static org.elasticsearch.search.aggregations.metrics.AbstractHyperLogLog.MIN_PRECISION; import static org.elasticsearch.search.aggregations.metrics.AbstractHyperLogLog.MIN_PRECISION;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class HyperLogLogPlusPlusSparseTests extends ESTestCase { public class HyperLogLogPlusPlusSparseTests extends ESTestCase {
public void testBasic() {
final int p = randomIntBetween(MIN_PRECISION, MAX_PRECISION);
HyperLogLogPlusPlusSparse sparse = new HyperLogLogPlusPlusSparse(p, BigArrays.NON_RECYCLING_INSTANCE, 10, 1);
AbstractLinearCounting.HashesIterator iterator = sparse.getLinearCounting(randomIntBetween(1, 10));
assertEquals(0, iterator.size());
IllegalArgumentException ex =
expectThrows(IllegalArgumentException.class, () -> sparse.getHyperLogLog(randomIntBetween(1, 10)));
assertThat(ex.getMessage(), Matchers.containsString("Implementation does not support HLL structures"));
}
public void testEquivalence() throws IOException { public void testEquivalence() throws IOException {
final int p = randomIntBetween(MIN_PRECISION, MAX_PRECISION); final int p = randomIntBetween(MIN_PRECISION, MAX_PRECISION);
final HyperLogLogPlusPlus single = new HyperLogLogPlusPlus(p, BigArrays.NON_RECYCLING_INSTANCE, 0); final HyperLogLogPlusPlus single = new HyperLogLogPlusPlus(p, BigArrays.NON_RECYCLING_INSTANCE, 0);
@ -75,4 +94,41 @@ public class HyperLogLogPlusPlusSparseTests extends ESTestCase {
assertTrue(first.equals(firstBucket, second, secondBucket)); assertTrue(first.equals(firstBucket, second, secondBucket));
assertTrue(second.equals(secondBucket, first, firstBucket)); assertTrue(second.equals(secondBucket, first, firstBucket));
} }
public void testCircuitBreakerOnConstruction() {
int whenToBreak = randomInt(10);
AtomicLong total = new AtomicLong();
CircuitBreakerService breakerService = mock(CircuitBreakerService.class);
when(breakerService.getBreaker(CircuitBreaker.REQUEST)).thenReturn(new NoopCircuitBreaker(CircuitBreaker.REQUEST) {
private int countDown = whenToBreak;
@Override
public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
if (countDown-- == 0) {
throw new CircuitBreakingException("test error", bytes, Long.MAX_VALUE, Durability.TRANSIENT);
}
total.addAndGet(bytes);
return total.get();
}
@Override
public long addWithoutBreaking(long bytes) {
total.addAndGet(bytes);
return total.get();
}
});
BigArrays bigArrays = new BigArrays(null, breakerService, CircuitBreaker.REQUEST).withCircuitBreaking();
final int p = randomIntBetween(AbstractCardinalityAlgorithm.MIN_PRECISION, AbstractCardinalityAlgorithm.MAX_PRECISION);
try {
for (int i = 0; i < whenToBreak + 1; ++i) {
final HyperLogLogPlusPlusSparse subject = new HyperLogLogPlusPlusSparse(p, bigArrays, 1, 1);
subject.close();
}
fail("Must fail");
} catch (CircuitBreakingException e) {
// OK
}
assertThat(total.get(), CoreMatchers.equalTo(0L));
}
} }