From cadd5dc53f8d58f03aaaa1e343e2466b4d53a567 Mon Sep 17 00:00:00 2001 From: Ignacio Vera Date: Mon, 21 Sep 2020 17:30:40 +0200 Subject: [PATCH] Fix bug when initializing HyperLogLogPlusPlusSparse (#62602) (#62702) This is a follow up of #62480 where we are oversizing one array when initialising. In addition it prevents a possible CircuitBreaker leak during initialisation. --- .../metrics/HyperLogLogPlusPlusSparse.java | 18 +++++- .../HyperLogLogPlusPlusSparseTests.java | 56 +++++++++++++++++++ 2 files changed, 71 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/HyperLogLogPlusPlusSparse.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/HyperLogLogPlusPlusSparse.java index 6db20006ab7..d18e93e4df5 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/HyperLogLogPlusPlusSparse.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/HyperLogLogPlusPlusSparse.java @@ -97,8 +97,20 @@ final class HyperLogLogPlusPlusSparse extends AbstractHyperLogLogPlusPlus implem super(p); this.bigArrays = bigArrays; this.capacity = capacity; - values = bigArrays.newIntArray(initialSize * capacity); - sizes = bigArrays.newIntArray(initialSize * capacity); + IntArray values = null; + IntArray sizes = null; + boolean success = false; + try { + values = bigArrays.newIntArray(initialSize * capacity); + sizes = bigArrays.newIntArray(initialSize); + success = true; + } finally { + if (success == false) { + Releasables.close(values, sizes); + } + } + this.values = values; + this.sizes = sizes; iterator = new LinearCountingIterator(this, capacity); } @@ -140,7 +152,7 @@ final class HyperLogLogPlusPlusSparse extends AbstractHyperLogLogPlusPlus implem int size = size(bucketOrd); if (size == 0) { sizes = bigArrays.grow(sizes, bucketOrd + 1); - values = bigArrays.grow(values, (bucketOrd * capacity) + capacity); + values = bigArrays.grow(values, (bucketOrd + 1) * capacity); } values.set(index(bucketOrd, size), value); return sizes.increment(bucketOrd, 1); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/HyperLogLogPlusPlusSparseTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/HyperLogLogPlusPlusSparseTests.java index 2e7f1fb24ba..d702f04927f 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/HyperLogLogPlusPlusSparseTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/HyperLogLogPlusPlusSparseTests.java @@ -20,17 +20,36 @@ package org.elasticsearch.search.aggregations.metrics; import com.carrotsearch.hppc.BitMixer; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.test.ESTestCase; +import org.hamcrest.CoreMatchers; +import org.hamcrest.Matchers; import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.search.aggregations.metrics.AbstractHyperLogLog.MAX_PRECISION; import static org.elasticsearch.search.aggregations.metrics.AbstractHyperLogLog.MIN_PRECISION; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class HyperLogLogPlusPlusSparseTests extends ESTestCase { + public void testBasic() { + final int p = randomIntBetween(MIN_PRECISION, MAX_PRECISION); + HyperLogLogPlusPlusSparse sparse = new HyperLogLogPlusPlusSparse(p, BigArrays.NON_RECYCLING_INSTANCE, 10, 1); + AbstractLinearCounting.HashesIterator iterator = sparse.getLinearCounting(randomIntBetween(1, 10)); + assertEquals(0, iterator.size()); + IllegalArgumentException ex = + expectThrows(IllegalArgumentException.class, () -> sparse.getHyperLogLog(randomIntBetween(1, 10))); + assertThat(ex.getMessage(), Matchers.containsString("Implementation does not support HLL structures")); + } + public void testEquivalence() throws IOException { final int p = randomIntBetween(MIN_PRECISION, MAX_PRECISION); final HyperLogLogPlusPlus single = new HyperLogLogPlusPlus(p, BigArrays.NON_RECYCLING_INSTANCE, 0); @@ -75,4 +94,41 @@ public class HyperLogLogPlusPlusSparseTests extends ESTestCase { assertTrue(first.equals(firstBucket, second, secondBucket)); assertTrue(second.equals(secondBucket, first, firstBucket)); } + + public void testCircuitBreakerOnConstruction() { + int whenToBreak = randomInt(10); + AtomicLong total = new AtomicLong(); + CircuitBreakerService breakerService = mock(CircuitBreakerService.class); + when(breakerService.getBreaker(CircuitBreaker.REQUEST)).thenReturn(new NoopCircuitBreaker(CircuitBreaker.REQUEST) { + private int countDown = whenToBreak; + @Override + public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException { + if (countDown-- == 0) { + throw new CircuitBreakingException("test error", bytes, Long.MAX_VALUE, Durability.TRANSIENT); + } + total.addAndGet(bytes); + return total.get(); + } + + @Override + public long addWithoutBreaking(long bytes) { + total.addAndGet(bytes); + return total.get(); + } + }); + BigArrays bigArrays = new BigArrays(null, breakerService, CircuitBreaker.REQUEST).withCircuitBreaking(); + final int p = randomIntBetween(AbstractCardinalityAlgorithm.MIN_PRECISION, AbstractCardinalityAlgorithm.MAX_PRECISION); + try { + for (int i = 0; i < whenToBreak + 1; ++i) { + final HyperLogLogPlusPlusSparse subject = new HyperLogLogPlusPlusSparse(p, bigArrays, 1, 1); + subject.close(); + } + fail("Must fail"); + } catch (CircuitBreakingException e) { + // OK + } + + assertThat(total.get(), CoreMatchers.equalTo(0L)); + } + }