Cardinality request breaker leak (#62685)

If HyperLogLogPlusPlus failed during construction, it would
not release already allocated resources, causing the request
circuit breaker to not be adjusted down.

Closes #62439
This commit is contained in:
Henning Andersen 2020-09-21 14:30:47 +02:00 committed by Henning Andersen
parent 4537561692
commit 0c4cfe4c44
2 changed files with 62 additions and 3 deletions

View File

@ -78,9 +78,23 @@ public final class HyperLogLogPlusPlus extends AbstractHyperLogLogPlusPlus {
public HyperLogLogPlusPlus(int precision, BigArrays bigArrays, long initialBucketCount) {
super(precision);
hll = new HyperLogLog(bigArrays, initialBucketCount, precision);
lc = new LinearCounting(bigArrays, initialBucketCount, precision, hll);
algorithm = new BitArray(1, bigArrays);
HyperLogLog hll = null;
LinearCounting lc = null;
BitArray algorithm = null;
boolean success = false;
try {
hll = new HyperLogLog(bigArrays, initialBucketCount, precision);
lc = new LinearCounting(bigArrays, initialBucketCount, precision, hll);
algorithm = new BitArray(1, bigArrays);
success = true;
} finally {
if (success == false) {
Releasables.close(hll, lc, algorithm);
}
}
this.hll = hll;
this.lc = lc;
this.algorithm = algorithm;
}
@Override

View File

@ -21,12 +21,21 @@ package org.elasticsearch.search.aggregations.metrics;
import com.carrotsearch.hppc.BitMixer;
import com.carrotsearch.hppc.IntHashSet;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.search.aggregations.metrics.AbstractHyperLogLog.MAX_PRECISION;
import static org.elasticsearch.search.aggregations.metrics.AbstractHyperLogLog.MIN_PRECISION;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class HyperLogLogPlusPlusTests extends ESTestCase {
public void testEncodeDecode() {
@ -126,4 +135,40 @@ public class HyperLogLogPlusPlusTests extends ESTestCase {
assertEquals(18, HyperLogLogPlusPlus.precisionFromThreshold(100000));
assertEquals(18, HyperLogLogPlusPlus.precisionFromThreshold(1000000));
}
public void testCircuitBreakerOnConstruction() {
int whenToBreak = randomInt(10);
AtomicLong total = new AtomicLong();
CircuitBreakerService breakerService = mock(CircuitBreakerService.class);
when(breakerService.getBreaker(CircuitBreaker.REQUEST)).thenReturn(new NoopCircuitBreaker(CircuitBreaker.REQUEST) {
private int countDown = whenToBreak;
@Override
public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
if (countDown-- == 0) {
throw new CircuitBreakingException("test error", bytes, Long.MAX_VALUE, Durability.TRANSIENT);
}
total.addAndGet(bytes);
return total.get();
}
@Override
public long addWithoutBreaking(long bytes) {
total.addAndGet(bytes);
return total.get();
}
});
BigArrays bigArrays = new BigArrays(null, breakerService, CircuitBreaker.REQUEST).withCircuitBreaking();
final int p = randomIntBetween(HyperLogLogPlusPlus.MIN_PRECISION, HyperLogLogPlusPlus.MAX_PRECISION);
try {
for (int i = 0; i < whenToBreak + 1; ++i) {
final HyperLogLogPlusPlus subject = new HyperLogLogPlusPlus(p, bigArrays, 0);
subject.close();
}
fail("Must fail");
} catch (CircuitBreakingException e) {
// OK
}
assertThat(total.get(), equalTo(0L));
}
}