diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java index 9cc5cf7bd81..24440807449 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java @@ -21,6 +21,7 @@ package org.elasticsearch.common.util.concurrent; import org.elasticsearch.Assertions; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.engine.EngineException; import java.util.concurrent.locks.Lock; @@ -57,6 +58,19 @@ public class ReleasableLock implements Releasable { return this; } + /** + * Try acquiring lock, returning null if unable to acquire lock within timeout. + */ + public ReleasableLock tryAcquire(TimeValue timeout) throws InterruptedException { + boolean locked = lock.tryLock(timeout.duration(), timeout.timeUnit()); + if (locked) { + assert addCurrentThread(); + return this; + } else { + return null; + } + } + private boolean addCurrentThread() { final Integer current = holdingThreads.get(); holdingThreads.set(current == null ? 1 : current + 1); diff --git a/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java b/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java index 86c407f8040..1a9648f591d 100644 --- a/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java +++ b/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java @@ -22,6 +22,7 @@ package org.elasticsearch.indices.breaker; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.Booleans; import org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; @@ -30,8 +31,14 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.monitor.jvm.GcNames; +import org.elasticsearch.monitor.jvm.JvmInfo; +import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.util.ArrayList; @@ -40,6 +47,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import java.util.function.LongSupplier; import java.util.stream.Collectors; import static org.elasticsearch.indices.breaker.BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING; @@ -104,7 +114,14 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { // Tripped count for when redistribution was attempted but wasn't successful private final AtomicLong parentTripCount = new AtomicLong(0); + private final OverLimitStrategy overLimitStrategy; + public HierarchyCircuitBreakerService(Settings settings, List customBreakers, ClusterSettings clusterSettings) { + this(settings, customBreakers, clusterSettings, HierarchyCircuitBreakerService::createOverLimitStrategy); + } + + HierarchyCircuitBreakerService(Settings settings, List customBreakers, ClusterSettings clusterSettings, + Function overLimitStrategyFactory) { super(); HashMap childCircuitBreakers = new HashMap<>(); childCircuitBreakers.put(CircuitBreaker.FIELDDATA, validateAndCreateBreaker( @@ -168,6 +185,8 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { CIRCUIT_BREAKER_OVERHEAD_SETTING, (name, updatedValues) -> updateCircuitBreakerSettings(name, updatedValues.v1(), updatedValues.v2()), (s, t) -> {}); + + this.overLimitStrategy = overLimitStrategyFactory.apply(this.trackRealMemoryUsage); } private void updateCircuitBreakerSettings(String name, ByteSizeValue newLimit, Double newOverhead) { @@ -231,7 +250,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { breaker.getTrippedCount()); } - private static class MemoryUsage { + static class MemoryUsage { final long baseUsage; final long totalUsage; final long transientChildUsage; @@ -268,6 +287,10 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { //package private to allow overriding it in tests long currentMemoryUsage() { + return realMemoryUsage(); + } + + static long realMemoryUsage() { try { return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed(); } catch (IllegalArgumentException ex) { @@ -290,7 +313,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException { final MemoryUsage memoryUsed = memoryUsed(newBytesReserved); long parentLimit = this.parentSettings.getLimit(); - if (memoryUsed.totalUsage > parentLimit) { + if (memoryUsed.totalUsage > parentLimit && overLimitStrategy.overLimit(memoryUsed).totalUsage > parentLimit) { this.parentTripCount.incrementAndGet(); final StringBuilder message = new StringBuilder("[parent] Data too large, data for [" + label + "]" + " would be [" + memoryUsed.totalUsage + "/" + new ByteSizeValue(memoryUsed.totalUsage) + "]" + @@ -334,4 +357,163 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { this, breakerSettings.getName()); } + + static OverLimitStrategy createOverLimitStrategy(boolean trackRealMemoryUsage) { + JvmInfo jvmInfo = JvmInfo.jvmInfo(); + if (trackRealMemoryUsage && jvmInfo.useG1GC().equals("true") + // messing with GC is "dangerous" so we apply an escape hatch. Not intended to be used. + && Booleans.parseBoolean(System.getProperty("es.real_memory_circuit_breaker.g1_over_limit_strategy.enabled"), true)) { + TimeValue lockTimeout = TimeValue.timeValueMillis( + Integer.parseInt(System.getProperty("es.real_memory_circuit_breaker.g1_over_limit_strategy.lock_timeout_ms", "500")) + ); + // hardcode interval, do not want any tuning of it outside code changes. + return new G1OverLimitStrategy(jvmInfo, HierarchyCircuitBreakerService::realMemoryUsage, createYoungGcCountSupplier(), + System::currentTimeMillis, 5000, lockTimeout); + } else { + return memoryUsed -> memoryUsed; + } + } + + static LongSupplier createYoungGcCountSupplier() { + List youngBeans = + ManagementFactory.getGarbageCollectorMXBeans().stream() + .filter(mxBean -> GcNames.getByGcName(mxBean.getName(), mxBean.getName()).equals(GcNames.YOUNG)) + .collect(Collectors.toList()); + assert youngBeans.size() == 1; + assert youngBeans.get(0).getCollectionCount() != -1 : "G1 must support getting collection count"; + + if (youngBeans.size() == 1) { + return youngBeans.get(0)::getCollectionCount; + } else { + logger.warn("Unable to find young generation collector, G1 over limit strategy might be impacted [{}]", youngBeans); + return () -> -1; + } + } + + interface OverLimitStrategy { + MemoryUsage overLimit(MemoryUsage memoryUsed); + } + + static class G1OverLimitStrategy implements OverLimitStrategy { + private final long g1RegionSize; + private final LongSupplier currentMemoryUsageSupplier; + private final LongSupplier gcCountSupplier; + private final LongSupplier timeSupplier; + private final TimeValue lockTimeout; + private final long maxHeap; + + private long lastCheckTime = Long.MIN_VALUE; + private final long minimumInterval; + + private long blackHole; + private final ReleasableLock lock = new ReleasableLock(new ReentrantLock()); + + G1OverLimitStrategy(JvmInfo jvmInfo, LongSupplier currentMemoryUsageSupplier, + LongSupplier gcCountSupplier, + LongSupplier timeSupplier, long minimumInterval, TimeValue lockTimeout) { + this.lockTimeout = lockTimeout; + assert minimumInterval > 0; + this.currentMemoryUsageSupplier = currentMemoryUsageSupplier; + this.gcCountSupplier = gcCountSupplier; + this.timeSupplier = timeSupplier; + this.minimumInterval = minimumInterval; + this.maxHeap = jvmInfo.getMem().getHeapMax().getBytes(); + long g1RegionSize = jvmInfo.getG1RegionSize(); + if (g1RegionSize <= 0) { + this.g1RegionSize = fallbackRegionSize(jvmInfo); + } else { + this.g1RegionSize = g1RegionSize; + } + } + + static long fallbackRegionSize(JvmInfo jvmInfo) { + // mimick JDK calculation based on JDK 14 source: + // https://hg.openjdk.java.net/jdk/jdk14/file/6c954123ee8d/src/hotspot/share/gc/g1/heapRegion.cpp#l65 + // notice that newer JDKs will have a slight variant only considering max-heap: + // https://hg.openjdk.java.net/jdk/jdk/file/e7d0ec2d06e8/src/hotspot/share/gc/g1/heapRegion.cpp#l67 + // based on this JDK "bug": + // https://bugs.openjdk.java.net/browse/JDK-8241670 + long averageHeapSize = + (jvmInfo.getMem().getHeapMax().getBytes() + JvmInfo.jvmInfo().getMem().getHeapMax().getBytes()) / 2; + long regionSize = Long.highestOneBit(averageHeapSize / 2048); + if (regionSize < ByteSizeUnit.MB.toBytes(1)) { + regionSize = ByteSizeUnit.MB.toBytes(1); + } else if (regionSize > ByteSizeUnit.MB.toBytes(32)) { + regionSize = ByteSizeUnit.MB.toBytes(32); + } + return regionSize; + } + + @Override + public MemoryUsage overLimit(MemoryUsage memoryUsed) { + boolean leader = false; + int allocationIndex = 0; + long allocationDuration = 0; + try (ReleasableLock locked = lock.tryAcquire(lockTimeout)) { + if (locked != null) { + long begin = timeSupplier.getAsLong(); + leader = begin >= lastCheckTime + minimumInterval; + overLimitTriggered(leader); + if (leader) { + long initialCollectionCount = gcCountSupplier.getAsLong(); + logger.info("attempting to trigger G1GC due to high heap usage [{}]", memoryUsed.baseUsage); + long localBlackHole = 0; + // number of allocations, corresponding to (approximately) number of free regions + 1 + int allocationCount = Math.toIntExact((maxHeap - memoryUsed.baseUsage) / g1RegionSize + 1); + // allocations of half-region size becomes single humongous alloc, thus taking up a full region. + int allocationSize = (int) (g1RegionSize >> 1); + long maxUsageObserved = memoryUsed.baseUsage; + for (; allocationIndex < allocationCount; ++allocationIndex) { + long current = currentMemoryUsageSupplier.getAsLong(); + if (current >= maxUsageObserved) { + maxUsageObserved = current; + } else { + // we observed a memory drop, so some GC must have occurred + break; + } + if (initialCollectionCount != gcCountSupplier.getAsLong()) { + break; + } + localBlackHole += new byte[allocationSize].hashCode(); + } + + blackHole += localBlackHole; + logger.trace("black hole [{}]", blackHole); + + long now = timeSupplier.getAsLong(); + this.lastCheckTime = now; + allocationDuration = now - begin; + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // fallthrough + } + + final long current = currentMemoryUsageSupplier.getAsLong(); + if (current < memoryUsed.baseUsage) { + if (leader) { + logger.info("GC did bring memory usage down, before [{}], after [{}], allocations [{}], duration [{}]", + memoryUsed.baseUsage, current, allocationIndex, allocationDuration); + } + return new MemoryUsage(current, memoryUsed.totalUsage - memoryUsed.baseUsage + current, + memoryUsed.transientChildUsage, memoryUsed.permanentChildUsage); + } else { + if (leader) { + logger.info("GC did not bring memory usage down, before [{}], after [{}], allocations [{}], duration [{}]", + memoryUsed.baseUsage, current, allocationIndex, allocationDuration); + } + // prefer original measurement when reporting if heap usage was not brought down. + return memoryUsed; + } + } + + void overLimitTriggered(boolean leader) { + // for tests to override. + } + + TimeValue getLockTimeout() { + return lockTimeout; + } + } } diff --git a/server/src/main/java/org/elasticsearch/monitor/jvm/JvmInfo.java b/server/src/main/java/org/elasticsearch/monitor/jvm/JvmInfo.java index 534534c799a..c02e08f3a5e 100644 --- a/server/src/main/java/org/elasticsearch/monitor/jvm/JvmInfo.java +++ b/server/src/main/java/org/elasticsearch/monitor/jvm/JvmInfo.java @@ -98,6 +98,7 @@ public class JvmInfo implements ReportingService.Info { String onOutOfMemoryError = null; String useCompressedOops = "unknown"; String useG1GC = "unknown"; + long g1RegisionSize = -1; String useSerialGC = "unknown"; long configuredInitialHeapSize = -1; long configuredMaxHeapSize = -1; @@ -130,6 +131,8 @@ public class JvmInfo implements ReportingService.Info { try { Object useG1GCVmOptionObject = vmOptionMethod.invoke(hotSpotDiagnosticMXBean, "UseG1GC"); useG1GC = (String) valueMethod.invoke(useG1GCVmOptionObject); + Object regionSizeVmOptionObject = vmOptionMethod.invoke(hotSpotDiagnosticMXBean, "G1HeapRegionSize"); + g1RegisionSize = Long.parseLong((String) valueMethod.invoke(regionSizeVmOptionObject)); } catch (Exception ignored) { } @@ -161,9 +164,11 @@ public class JvmInfo implements ReportingService.Info { INSTANCE = new JvmInfo(JvmPid.getPid(), System.getProperty("java.version"), runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), bundledJdk, usingBundledJdk, runtimeMXBean.getStartTime(), configuredInitialHeapSize, configuredMaxHeapSize, mem, inputArguments, bootClassPath, classPath, systemProperties, gcCollectors, memoryPools, onError, - onOutOfMemoryError, useCompressedOops, useG1GC, useSerialGC); + onOutOfMemoryError, useCompressedOops, useG1GC, useSerialGC, + g1RegisionSize); } + @SuppressForbidden(reason = "PathUtils#get") private static boolean usingBundledJdk() { /* @@ -210,12 +215,13 @@ public class JvmInfo implements ReportingService.Info { private final String useCompressedOops; private final String useG1GC; private final String useSerialGC; + private final long g1RegionSize; private JvmInfo(long pid, String version, String vmName, String vmVersion, String vmVendor, boolean bundledJdk, Boolean usingBundledJdk, long startTime, long configuredInitialHeapSize, long configuredMaxHeapSize, Mem mem, String[] inputArguments, String bootClassPath, String classPath, Map systemProperties, String[] gcCollectors, String[] memoryPools, String onError, String onOutOfMemoryError, String useCompressedOops, String useG1GC, - String useSerialGC) { + String useSerialGC, long g1RegionSize) { this.pid = pid; this.version = version; this.vmName = vmName; @@ -238,6 +244,7 @@ public class JvmInfo implements ReportingService.Info { this.useCompressedOops = useCompressedOops; this.useG1GC = useG1GC; this.useSerialGC = useSerialGC; + this.g1RegionSize = g1RegionSize; } public JvmInfo(StreamInput in) throws IOException { @@ -272,6 +279,7 @@ public class JvmInfo implements ReportingService.Info { this.onOutOfMemoryError = null; this.useG1GC = "unknown"; this.useSerialGC = "unknown"; + this.g1RegionSize = -1; } @Override @@ -467,6 +475,10 @@ public class JvmInfo implements ReportingService.Info { return this.useSerialGC; } + public long getG1RegionSize() { + return g1RegionSize; + } + public String[] getGcCollectors() { return gcCollectors; } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java index 6a303449ce1..e7ebbbf102e 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java @@ -20,13 +20,25 @@ package org.elasticsearch.common.util.concurrent; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; public class ReleasableLockTests extends ESTestCase { @@ -79,10 +91,10 @@ public class ReleasableLockTests extends ESTestCase { } private void acquire(final ReleasableLock lockToAcquire, final ReleasableLock otherLock) { - try (@SuppressWarnings("unused") Releasable outer = lockToAcquire.acquire()) { + try (@SuppressWarnings("unused") Releasable outer = randomAcquireMethod(lockToAcquire)) { assertTrue(lockToAcquire.isHeldByCurrentThread()); assertFalse(otherLock.isHeldByCurrentThread()); - try (@SuppressWarnings("unused") Releasable inner = lockToAcquire.acquire()) { + try (@SuppressWarnings("unused") Releasable inner = randomAcquireMethod(lockToAcquire)) { assertTrue(lockToAcquire.isHeldByCurrentThread()); assertFalse(otherLock.isHeldByCurrentThread()); } @@ -94,4 +106,56 @@ public class ReleasableLockTests extends ESTestCase { assertFalse(otherLock.isHeldByCurrentThread()); } + private ReleasableLock randomAcquireMethod(ReleasableLock lock) { + if (randomBoolean()) { + return lock.acquire(); + } else { + try { + ReleasableLock releasableLock = lock.tryAcquire(TimeValue.timeValueSeconds(30)); + assertThat(releasableLock, notNullValue()); + return releasableLock; + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + } + + public void testTryAcquire() throws Exception { + ReleasableLock lock = new ReleasableLock(new ReentrantLock()); + int numberOfThreads = randomIntBetween(1, 10); + CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); + AtomicInteger lockedCounter = new AtomicInteger(); + int timeout = randomFrom(0, 5, 10); + List threads = + IntStream.range(0, numberOfThreads).mapToObj(i -> new Thread(() -> { + try { + barrier.await(10, TimeUnit.SECONDS); + try (ReleasableLock locked = lock.tryAcquire(TimeValue.timeValueMillis(timeout))) { + if (locked != null) { + lockedCounter.incrementAndGet(); + } + } + } catch (InterruptedException | BrokenBarrierException | TimeoutException e) { + throw new AssertionError(e); + } + })).collect(Collectors.toList()); + threads.forEach(Thread::start); + try (ReleasableLock locked = randomBoolean() ? lock.acquire() : null) { + barrier.await(10, TimeUnit.SECONDS); + for (Thread thread : threads) { + thread.join(10000); + } + threads.forEach(t -> assertThat(t.isAlive(), is(false))); + + if (locked != null) { + assertThat(lockedCounter.get(), equalTo(0)); + } else { + assertThat(lockedCounter.get(), greaterThanOrEqualTo(1)); + } + } + + try (ReleasableLock locked = lock.tryAcquire(TimeValue.ZERO)) { + assertThat(locked, notNullValue()); + } + } } diff --git a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java index 9bcfcf25cff..eaf5d693131 100644 --- a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java @@ -27,22 +27,38 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.test.ESTestCase; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.Arrays; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.LongSupplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; public class HierarchyCircuitBreakerServiceTests extends ESTestCase { @@ -270,6 +286,337 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { assertEquals(0, requestBreaker.getTrippedCount()); } + /** + * "Integration test" checking that we ask the G1 over limit check before parent breaking. + * Given that it depends on GC, the main assertion that we do not get a circuit breaking exception in the threads towards + * the end of the test is not enabled. The following tests checks this in more unit test style. + */ + public void testParentTriggersG1GCBeforeBreaking() throws InterruptedException, TimeoutException, BrokenBarrierException { + assumeTrue("Only G1GC can utilize the over limit check", JvmInfo.jvmInfo().useG1GC().equals("true")); + long g1RegionSize = JvmInfo.jvmInfo().getG1RegionSize(); + assumeTrue("Must have region size", g1RegionSize > 0); + + Settings clusterSettings = Settings.builder() + .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), Boolean.TRUE) + .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "50%") + .build(); + + AtomicInteger leaderTriggerCount = new AtomicInteger(); + AtomicReference> onOverLimit = new AtomicReference<>(leader -> {}); + AtomicLong time = new AtomicLong(randomLongBetween(Long.MIN_VALUE/2, Long.MAX_VALUE/2)); + long interval = randomLongBetween(1, 1000); + final HierarchyCircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings, + Collections.emptyList(), + new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + trackRealMemoryUsage -> new HierarchyCircuitBreakerService.G1OverLimitStrategy(JvmInfo.jvmInfo(), + HierarchyCircuitBreakerService::realMemoryUsage, + HierarchyCircuitBreakerService.createYoungGcCountSupplier(), time::get, interval, TimeValue.timeValueSeconds(30)) { + + @Override + void overLimitTriggered(boolean leader) { + if (leader) { + leaderTriggerCount.incrementAndGet(); + } + onOverLimit.get().accept(leader); + } + }); + + long maxHeap = JvmInfo.jvmInfo().getConfiguredMaxHeapSize(); + int regionCount = Math.toIntExact((maxHeap / 2 + g1RegionSize - 1) / g1RegionSize); + + // First setup a host of large byte[]'s, must be Humongous objects since those are cleaned during a young phase (no concurrent cycle + // necessary, which is hard to control in the test). + List data = new ArrayList<>(); + for (int i = 0; i < regionCount; ++i) { + data.add(new byte[(int) (JvmInfo.jvmInfo().getG1RegionSize() / 2)]); + } + try { + service.checkParentLimit(0, "test"); + fail("must exceed memory limit"); + } catch (CircuitBreakingException e) { + // OK + } + + time.addAndGet(randomLongBetween(interval, interval + 10)); + onOverLimit.set(leader -> { + if (leader) { + data.clear(); + } + }); + + logger.trace("black hole [{}]", data.hashCode()); + + int threadCount = randomIntBetween(1, 10); + CyclicBarrier barrier = new CyclicBarrier(threadCount + 1); + List threads = new ArrayList<>(threadCount); + for (int i = 0; i < threadCount; ++i) { + threads.add( + new Thread(() -> { + try { + barrier.await(10, TimeUnit.SECONDS); + service.checkParentLimit(0, "test-thread"); + } catch (InterruptedException | BrokenBarrierException | TimeoutException e) { + throw new AssertionError(e); + } catch (CircuitBreakingException e) { + // very rare + logger.info("Thread got semi-unexpected circuit breaking exception", e); + } + })); + } + + threads.forEach(Thread::start); + barrier.await(20, TimeUnit.SECONDS); + + for (Thread thread : threads) { + thread.join(10000); + } + threads.forEach(thread -> assertFalse(thread.isAlive())); + + assertThat(leaderTriggerCount.get(), equalTo(2)); + } + + public void testParentDoesOverLimitCheck() { + long g1RegionSize = JvmInfo.jvmInfo().getG1RegionSize(); + + Settings clusterSettings = Settings.builder() + .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), Boolean.TRUE) + .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "50%") + .build(); + boolean saveTheDay = randomBoolean(); + AtomicBoolean overLimitTriggered = new AtomicBoolean(); + final HierarchyCircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings, + Collections.emptyList(), + new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + trackRealMemoryUsage -> + memoryUsed -> { + assertTrue(overLimitTriggered.compareAndSet(false, true)); + if (saveTheDay) { + return new HierarchyCircuitBreakerService.MemoryUsage(memoryUsed.baseUsage / 2, + memoryUsed.totalUsage - (memoryUsed.baseUsage / 2), memoryUsed.transientChildUsage, + memoryUsed.permanentChildUsage); + } else { + return memoryUsed; + } + } + ); + + int allocationSize = g1RegionSize > 0 ? (int) (g1RegionSize / 2) : 1024 * 1024; + int allocationCount = (int) (JvmInfo.jvmInfo().getConfiguredMaxHeapSize() / allocationSize) + 1; + List data = new ArrayList<>(); + try { + for (int i = 0 ; i < allocationCount && overLimitTriggered.get() == false; ++i) { + data.add(new byte[allocationSize]); + service.checkParentLimit(0, "test"); + } + assertTrue(saveTheDay); + } catch (CircuitBreakingException e) { + assertFalse(saveTheDay); + } + + logger.trace("black hole [{}]", data.hashCode()); + } + + public void testFallbackG1RegionSize() { + assumeTrue("Only G1GC can utilize the over limit check", JvmInfo.jvmInfo().useG1GC().equals("true")); + assumeTrue("Must have region size", JvmInfo.jvmInfo().getG1RegionSize() > 0); + + assertThat(HierarchyCircuitBreakerService.G1OverLimitStrategy.fallbackRegionSize(JvmInfo.jvmInfo()), + equalTo(JvmInfo.jvmInfo().getG1RegionSize())); + } + + public void testG1OverLimitStrategyBreakOnMemory() { + AtomicLong time = new AtomicLong(randomLongBetween(Long.MIN_VALUE/2, Long.MAX_VALUE/2)); + AtomicInteger leaderTriggerCount = new AtomicInteger(); + AtomicInteger nonLeaderTriggerCount = new AtomicInteger(); + long interval = randomLongBetween(1, 1000); + AtomicLong memoryUsage = new AtomicLong(); + + HierarchyCircuitBreakerService.G1OverLimitStrategy strategy = + new HierarchyCircuitBreakerService.G1OverLimitStrategy(JvmInfo.jvmInfo(), memoryUsage::get, () -> 0, time::get, interval, + TimeValue.timeValueSeconds(30)) { + @Override + void overLimitTriggered(boolean leader) { + if (leader) { + leaderTriggerCount.incrementAndGet(); + } else { + nonLeaderTriggerCount.incrementAndGet(); + } + } + }; + memoryUsage.set(randomLongBetween(100, 110)); + HierarchyCircuitBreakerService.MemoryUsage input = new HierarchyCircuitBreakerService.MemoryUsage(100, randomLongBetween(100, 110), + randomLongBetween(0, 50), + randomLongBetween(0, 50)); + + assertThat(strategy.overLimit(input), sameInstance(input)); + assertThat(leaderTriggerCount.get(), equalTo(1)); + + memoryUsage.set(99); + HierarchyCircuitBreakerService.MemoryUsage output = strategy.overLimit(input); + assertThat(output, not(sameInstance(input))); + assertThat(output.baseUsage, equalTo(memoryUsage.get())); + assertThat(output.totalUsage, equalTo(99 + input.totalUsage - 100)); + assertThat(output.transientChildUsage, equalTo(input.transientChildUsage)); + assertThat(output.permanentChildUsage, equalTo(input.permanentChildUsage)); + assertThat(nonLeaderTriggerCount.get(), equalTo(1)); + + time.addAndGet(randomLongBetween(interval, interval * 2)); + output = strategy.overLimit(input); + assertThat(output, not(sameInstance(input))); + assertThat(output.baseUsage, equalTo(memoryUsage.get())); + assertThat(output.totalUsage, equalTo(99 + input.totalUsage - 100)); + assertThat(output.transientChildUsage, equalTo(input.transientChildUsage)); + assertThat(output.permanentChildUsage, equalTo(input.permanentChildUsage)); + assertThat(leaderTriggerCount.get(), equalTo(2)); + } + + public void testG1OverLimitStrategyBreakOnGcCount() { + AtomicLong time = new AtomicLong(randomLongBetween(Long.MIN_VALUE/2, Long.MAX_VALUE/2)); + AtomicInteger leaderTriggerCount = new AtomicInteger(); + AtomicInteger nonLeaderTriggerCount = new AtomicInteger(); + long interval = randomLongBetween(1, 1000); + AtomicLong memoryUsageCounter = new AtomicLong(); + AtomicLong gcCounter = new AtomicLong(); + LongSupplier memoryUsageSupplier = () -> { + memoryUsageCounter.incrementAndGet(); + return randomLongBetween(100, 110); + }; + HierarchyCircuitBreakerService.G1OverLimitStrategy strategy = + new HierarchyCircuitBreakerService.G1OverLimitStrategy(JvmInfo.jvmInfo(), + memoryUsageSupplier, + gcCounter::incrementAndGet, + time::get, interval, TimeValue.timeValueSeconds(30)) { + + @Override + void overLimitTriggered(boolean leader) { + if (leader) { + leaderTriggerCount.incrementAndGet(); + } else { + nonLeaderTriggerCount.incrementAndGet(); + } + } + }; + HierarchyCircuitBreakerService.MemoryUsage input = new HierarchyCircuitBreakerService.MemoryUsage(100, randomLongBetween(100, 110), + randomLongBetween(0, 50), + randomLongBetween(0, 50)); + + assertThat(strategy.overLimit(input), sameInstance(input)); + assertThat(leaderTriggerCount.get(), equalTo(1)); + assertThat(gcCounter.get(), equalTo(2L)); + assertThat(memoryUsageCounter.get(), equalTo(2L)); // 1 before gc count break and 1 to get resulting memory usage. + } + + public void testG1OverLimitStrategyThrottling() throws InterruptedException, BrokenBarrierException, TimeoutException { + AtomicLong time = new AtomicLong(randomLongBetween(Long.MIN_VALUE/2, Long.MAX_VALUE/2)); + AtomicInteger leaderTriggerCount = new AtomicInteger(); + long interval = randomLongBetween(1, 1000); + AtomicLong memoryUsage = new AtomicLong(); + HierarchyCircuitBreakerService.G1OverLimitStrategy strategy = + new HierarchyCircuitBreakerService.G1OverLimitStrategy(JvmInfo.jvmInfo(), memoryUsage::get, () -> 0, + time::get, interval, TimeValue.timeValueSeconds(30)) { + + @Override + void overLimitTriggered(boolean leader) { + if (leader) { + leaderTriggerCount.incrementAndGet(); + } + } + }; + + int threadCount = randomIntBetween(1, 10); + CyclicBarrier barrier = new CyclicBarrier(threadCount + 1); + AtomicReference countDown = new AtomicReference<>(new CountDownLatch(randomIntBetween(1, 20))); + List threads = IntStream.range(0, threadCount) + .mapToObj(i -> new Thread(() -> { + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (InterruptedException | BrokenBarrierException | TimeoutException e) { + throw new AssertionError(e); + } + do { + HierarchyCircuitBreakerService.MemoryUsage input = + new HierarchyCircuitBreakerService.MemoryUsage(randomLongBetween(0, 100), randomLongBetween(0, 100), + randomLongBetween(0, 100), randomLongBetween(0, 100)); + HierarchyCircuitBreakerService.MemoryUsage output = strategy.overLimit(input); + assertThat(output.totalUsage, equalTo(output.baseUsage + input.totalUsage - input.baseUsage)); + assertThat(output.transientChildUsage, equalTo(input.transientChildUsage)); + assertThat(output.permanentChildUsage, equalTo(input.permanentChildUsage)); + countDown.get().countDown(); + } while (!Thread.interrupted()); + })).collect(Collectors.toList()); + + threads.forEach(Thread::start); + barrier.await(20, TimeUnit.SECONDS); + + int iterationCount = randomIntBetween(1, 5); + for (int i = 0; i < iterationCount; ++i) { + memoryUsage.set(randomLongBetween(0, 100)); + assertTrue(countDown.get().await(20, TimeUnit.SECONDS)); + assertThat(leaderTriggerCount.get(), lessThanOrEqualTo(i + 1)); + assertThat(leaderTriggerCount.get(), greaterThanOrEqualTo(i / 2 + 1)); + time.addAndGet(randomLongBetween(interval, interval * 2)); + countDown.set(new CountDownLatch(randomIntBetween(1, 20))); + } + + threads.forEach(Thread::interrupt); + for (Thread thread : threads) { + thread.join(10000); + } + threads.forEach(thread -> assertFalse(thread.isAlive())); + } + + public void testCreateOverLimitStrategy() { + assertThat(HierarchyCircuitBreakerService.createOverLimitStrategy(false), + not(instanceOf(HierarchyCircuitBreakerService.G1OverLimitStrategy.class))); + HierarchyCircuitBreakerService.OverLimitStrategy overLimitStrategy = HierarchyCircuitBreakerService.createOverLimitStrategy(true); + if (JvmInfo.jvmInfo().useG1GC().equals("true")) { + assertThat(overLimitStrategy, instanceOf(HierarchyCircuitBreakerService.G1OverLimitStrategy.class)); + assertThat(((HierarchyCircuitBreakerService.G1OverLimitStrategy) overLimitStrategy).getLockTimeout(), + equalTo(TimeValue.timeValueMillis(500))); + } else { + assertThat(overLimitStrategy, not(instanceOf(HierarchyCircuitBreakerService.G1OverLimitStrategy.class))); + } + } + + public void testG1LockTimeout() throws Exception { + CountDownLatch startedBlocking = new CountDownLatch(1); + CountDownLatch blockingUntil = new CountDownLatch(1); + AtomicLong gcCounter = new AtomicLong(); + HierarchyCircuitBreakerService.G1OverLimitStrategy strategy = + new HierarchyCircuitBreakerService.G1OverLimitStrategy(JvmInfo.jvmInfo(), () -> 100, gcCounter::incrementAndGet, + () -> 0, 1, TimeValue.timeValueMillis(randomFrom(0, 5, 10))) { + + @Override + void overLimitTriggered(boolean leader) { + if (leader) { + startedBlocking.countDown(); + try { + // this is the central assertion - the overLimit call below should complete in a timely manner. + assertThat(blockingUntil.await(10, TimeUnit.SECONDS), is(true)); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + } + }; + + HierarchyCircuitBreakerService.MemoryUsage input = new HierarchyCircuitBreakerService.MemoryUsage(100, 100, 0, 0); + Thread blocker = new Thread(() -> { + strategy.overLimit(input); + }); + blocker.start(); + try { + assertThat(startedBlocking.await(10, TimeUnit.SECONDS), is(true)); + + // this should complete in a timely manner, verified by the assertion in the thread. + assertThat(strategy.overLimit(input), sameInstance(input)); + } finally { + blockingUntil.countDown(); + blocker.join(10000); + assertThat(blocker.isAlive(), is(false)); + } + } + public void testTrippedCircuitBreakerDurability() { Settings clusterSettings = Settings.builder() .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), Boolean.FALSE)