From adf6083dd09ff69e34f952bd420b42b6f41bb4b0 Mon Sep 17 00:00:00 2001 From: Henning Andersen <33268011+henningandersen@users.noreply.github.com> Date: Mon, 13 Jul 2020 17:41:09 +0200 Subject: [PATCH] Enhance real memory circuit breaker with G1 GC (#58674) (#59394) Using G1 GC, Elasticsearch can rarely trigger that heap usage goes above the real memory circuit breaker limit and stays there for an extended period. This situation will persist until the next young GC. The circuit breaking itself hinders that from occurring in a timely manner since it breaks all request before real work is done. This commit gently nudges G1 to do a young GC and then double checks that heap usage is still above the real memory circuit breaker limit before throwing the circuit breaker exception. Related to #57202 --- .../util/concurrent/ReleasableLock.java | 14 + .../HierarchyCircuitBreakerService.java | 186 +++++++++- .../elasticsearch/monitor/jvm/JvmInfo.java | 16 +- .../util/concurrent/ReleasableLockTests.java | 68 +++- .../HierarchyCircuitBreakerServiceTests.java | 347 ++++++++++++++++++ 5 files changed, 625 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java index 9cc5cf7bd81..24440807449 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java @@ -21,6 +21,7 @@ package org.elasticsearch.common.util.concurrent; import org.elasticsearch.Assertions; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.engine.EngineException; import java.util.concurrent.locks.Lock; @@ -57,6 +58,19 @@ public class ReleasableLock implements Releasable { return this; } + /** + * Try acquiring lock, returning null if unable to acquire lock within timeout. + */ + public ReleasableLock tryAcquire(TimeValue timeout) throws InterruptedException { + boolean locked = lock.tryLock(timeout.duration(), timeout.timeUnit()); + if (locked) { + assert addCurrentThread(); + return this; + } else { + return null; + } + } + private boolean addCurrentThread() { final Integer current = holdingThreads.get(); holdingThreads.set(current == null ? 1 : current + 1); diff --git a/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java b/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java index 86c407f8040..1a9648f591d 100644 --- a/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java +++ b/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java @@ -22,6 +22,7 @@ package org.elasticsearch.indices.breaker; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.Booleans; import org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; @@ -30,8 +31,14 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.monitor.jvm.GcNames; +import org.elasticsearch.monitor.jvm.JvmInfo; +import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.util.ArrayList; @@ -40,6 +47,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import java.util.function.LongSupplier; import java.util.stream.Collectors; import static org.elasticsearch.indices.breaker.BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING; @@ -104,7 +114,14 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { // Tripped count for when redistribution was attempted but wasn't successful private final AtomicLong parentTripCount = new AtomicLong(0); + private final OverLimitStrategy overLimitStrategy; + public HierarchyCircuitBreakerService(Settings settings, List customBreakers, ClusterSettings clusterSettings) { + this(settings, customBreakers, clusterSettings, HierarchyCircuitBreakerService::createOverLimitStrategy); + } + + HierarchyCircuitBreakerService(Settings settings, List customBreakers, ClusterSettings clusterSettings, + Function overLimitStrategyFactory) { super(); HashMap childCircuitBreakers = new HashMap<>(); childCircuitBreakers.put(CircuitBreaker.FIELDDATA, validateAndCreateBreaker( @@ -168,6 +185,8 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { CIRCUIT_BREAKER_OVERHEAD_SETTING, (name, updatedValues) -> updateCircuitBreakerSettings(name, updatedValues.v1(), updatedValues.v2()), (s, t) -> {}); + + this.overLimitStrategy = overLimitStrategyFactory.apply(this.trackRealMemoryUsage); } private void updateCircuitBreakerSettings(String name, ByteSizeValue newLimit, Double newOverhead) { @@ -231,7 +250,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { breaker.getTrippedCount()); } - private static class MemoryUsage { + static class MemoryUsage { final long baseUsage; final long totalUsage; final long transientChildUsage; @@ -268,6 +287,10 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { //package private to allow overriding it in tests long currentMemoryUsage() { + return realMemoryUsage(); + } + + static long realMemoryUsage() { try { return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed(); } catch (IllegalArgumentException ex) { @@ -290,7 +313,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException { final MemoryUsage memoryUsed = memoryUsed(newBytesReserved); long parentLimit = this.parentSettings.getLimit(); - if (memoryUsed.totalUsage > parentLimit) { + if (memoryUsed.totalUsage > parentLimit && overLimitStrategy.overLimit(memoryUsed).totalUsage > parentLimit) { this.parentTripCount.incrementAndGet(); final StringBuilder message = new StringBuilder("[parent] Data too large, data for [" + label + "]" + " would be [" + memoryUsed.totalUsage + "/" + new ByteSizeValue(memoryUsed.totalUsage) + "]" + @@ -334,4 +357,163 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { this, breakerSettings.getName()); } + + static OverLimitStrategy createOverLimitStrategy(boolean trackRealMemoryUsage) { + JvmInfo jvmInfo = JvmInfo.jvmInfo(); + if (trackRealMemoryUsage && jvmInfo.useG1GC().equals("true") + // messing with GC is "dangerous" so we apply an escape hatch. Not intended to be used. + && Booleans.parseBoolean(System.getProperty("es.real_memory_circuit_breaker.g1_over_limit_strategy.enabled"), true)) { + TimeValue lockTimeout = TimeValue.timeValueMillis( + Integer.parseInt(System.getProperty("es.real_memory_circuit_breaker.g1_over_limit_strategy.lock_timeout_ms", "500")) + ); + // hardcode interval, do not want any tuning of it outside code changes. + return new G1OverLimitStrategy(jvmInfo, HierarchyCircuitBreakerService::realMemoryUsage, createYoungGcCountSupplier(), + System::currentTimeMillis, 5000, lockTimeout); + } else { + return memoryUsed -> memoryUsed; + } + } + + static LongSupplier createYoungGcCountSupplier() { + List youngBeans = + ManagementFactory.getGarbageCollectorMXBeans().stream() + .filter(mxBean -> GcNames.getByGcName(mxBean.getName(), mxBean.getName()).equals(GcNames.YOUNG)) + .collect(Collectors.toList()); + assert youngBeans.size() == 1; + assert youngBeans.get(0).getCollectionCount() != -1 : "G1 must support getting collection count"; + + if (youngBeans.size() == 1) { + return youngBeans.get(0)::getCollectionCount; + } else { + logger.warn("Unable to find young generation collector, G1 over limit strategy might be impacted [{}]", youngBeans); + return () -> -1; + } + } + + interface OverLimitStrategy { + MemoryUsage overLimit(MemoryUsage memoryUsed); + } + + static class G1OverLimitStrategy implements OverLimitStrategy { + private final long g1RegionSize; + private final LongSupplier currentMemoryUsageSupplier; + private final LongSupplier gcCountSupplier; + private final LongSupplier timeSupplier; + private final TimeValue lockTimeout; + private final long maxHeap; + + private long lastCheckTime = Long.MIN_VALUE; + private final long minimumInterval; + + private long blackHole; + private final ReleasableLock lock = new ReleasableLock(new ReentrantLock()); + + G1OverLimitStrategy(JvmInfo jvmInfo, LongSupplier currentMemoryUsageSupplier, + LongSupplier gcCountSupplier, + LongSupplier timeSupplier, long minimumInterval, TimeValue lockTimeout) { + this.lockTimeout = lockTimeout; + assert minimumInterval > 0; + this.currentMemoryUsageSupplier = currentMemoryUsageSupplier; + this.gcCountSupplier = gcCountSupplier; + this.timeSupplier = timeSupplier; + this.minimumInterval = minimumInterval; + this.maxHeap = jvmInfo.getMem().getHeapMax().getBytes(); + long g1RegionSize = jvmInfo.getG1RegionSize(); + if (g1RegionSize <= 0) { + this.g1RegionSize = fallbackRegionSize(jvmInfo); + } else { + this.g1RegionSize = g1RegionSize; + } + } + + static long fallbackRegionSize(JvmInfo jvmInfo) { + // mimick JDK calculation based on JDK 14 source: + // https://hg.openjdk.java.net/jdk/jdk14/file/6c954123ee8d/src/hotspot/share/gc/g1/heapRegion.cpp#l65 + // notice that newer JDKs will have a slight variant only considering max-heap: + // https://hg.openjdk.java.net/jdk/jdk/file/e7d0ec2d06e8/src/hotspot/share/gc/g1/heapRegion.cpp#l67 + // based on this JDK "bug": + // https://bugs.openjdk.java.net/browse/JDK-8241670 + long averageHeapSize = + (jvmInfo.getMem().getHeapMax().getBytes() + JvmInfo.jvmInfo().getMem().getHeapMax().getBytes()) / 2; + long regionSize = Long.highestOneBit(averageHeapSize / 2048); + if (regionSize < ByteSizeUnit.MB.toBytes(1)) { + regionSize = ByteSizeUnit.MB.toBytes(1); + } else if (regionSize > ByteSizeUnit.MB.toBytes(32)) { + regionSize = ByteSizeUnit.MB.toBytes(32); + } + return regionSize; + } + + @Override + public MemoryUsage overLimit(MemoryUsage memoryUsed) { + boolean leader = false; + int allocationIndex = 0; + long allocationDuration = 0; + try (ReleasableLock locked = lock.tryAcquire(lockTimeout)) { + if (locked != null) { + long begin = timeSupplier.getAsLong(); + leader = begin >= lastCheckTime + minimumInterval; + overLimitTriggered(leader); + if (leader) { + long initialCollectionCount = gcCountSupplier.getAsLong(); + logger.info("attempting to trigger G1GC due to high heap usage [{}]", memoryUsed.baseUsage); + long localBlackHole = 0; + // number of allocations, corresponding to (approximately) number of free regions + 1 + int allocationCount = Math.toIntExact((maxHeap - memoryUsed.baseUsage) / g1RegionSize + 1); + // allocations of half-region size becomes single humongous alloc, thus taking up a full region. + int allocationSize = (int) (g1RegionSize >> 1); + long maxUsageObserved = memoryUsed.baseUsage; + for (; allocationIndex < allocationCount; ++allocationIndex) { + long current = currentMemoryUsageSupplier.getAsLong(); + if (current >= maxUsageObserved) { + maxUsageObserved = current; + } else { + // we observed a memory drop, so some GC must have occurred + break; + } + if (initialCollectionCount != gcCountSupplier.getAsLong()) { + break; + } + localBlackHole += new byte[allocationSize].hashCode(); + } + + blackHole += localBlackHole; + logger.trace("black hole [{}]", blackHole); + + long now = timeSupplier.getAsLong(); + this.lastCheckTime = now; + allocationDuration = now - begin; + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // fallthrough + } + + final long current = currentMemoryUsageSupplier.getAsLong(); + if (current < memoryUsed.baseUsage) { + if (leader) { + logger.info("GC did bring memory usage down, before [{}], after [{}], allocations [{}], duration [{}]", + memoryUsed.baseUsage, current, allocationIndex, allocationDuration); + } + return new MemoryUsage(current, memoryUsed.totalUsage - memoryUsed.baseUsage + current, + memoryUsed.transientChildUsage, memoryUsed.permanentChildUsage); + } else { + if (leader) { + logger.info("GC did not bring memory usage down, before [{}], after [{}], allocations [{}], duration [{}]", + memoryUsed.baseUsage, current, allocationIndex, allocationDuration); + } + // prefer original measurement when reporting if heap usage was not brought down. + return memoryUsed; + } + } + + void overLimitTriggered(boolean leader) { + // for tests to override. + } + + TimeValue getLockTimeout() { + return lockTimeout; + } + } } diff --git a/server/src/main/java/org/elasticsearch/monitor/jvm/JvmInfo.java b/server/src/main/java/org/elasticsearch/monitor/jvm/JvmInfo.java index 534534c799a..c02e08f3a5e 100644 --- a/server/src/main/java/org/elasticsearch/monitor/jvm/JvmInfo.java +++ b/server/src/main/java/org/elasticsearch/monitor/jvm/JvmInfo.java @@ -98,6 +98,7 @@ public class JvmInfo implements ReportingService.Info { String onOutOfMemoryError = null; String useCompressedOops = "unknown"; String useG1GC = "unknown"; + long g1RegisionSize = -1; String useSerialGC = "unknown"; long configuredInitialHeapSize = -1; long configuredMaxHeapSize = -1; @@ -130,6 +131,8 @@ public class JvmInfo implements ReportingService.Info { try { Object useG1GCVmOptionObject = vmOptionMethod.invoke(hotSpotDiagnosticMXBean, "UseG1GC"); useG1GC = (String) valueMethod.invoke(useG1GCVmOptionObject); + Object regionSizeVmOptionObject = vmOptionMethod.invoke(hotSpotDiagnosticMXBean, "G1HeapRegionSize"); + g1RegisionSize = Long.parseLong((String) valueMethod.invoke(regionSizeVmOptionObject)); } catch (Exception ignored) { } @@ -161,9 +164,11 @@ public class JvmInfo implements ReportingService.Info { INSTANCE = new JvmInfo(JvmPid.getPid(), System.getProperty("java.version"), runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), bundledJdk, usingBundledJdk, runtimeMXBean.getStartTime(), configuredInitialHeapSize, configuredMaxHeapSize, mem, inputArguments, bootClassPath, classPath, systemProperties, gcCollectors, memoryPools, onError, - onOutOfMemoryError, useCompressedOops, useG1GC, useSerialGC); + onOutOfMemoryError, useCompressedOops, useG1GC, useSerialGC, + g1RegisionSize); } + @SuppressForbidden(reason = "PathUtils#get") private static boolean usingBundledJdk() { /* @@ -210,12 +215,13 @@ public class JvmInfo implements ReportingService.Info { private final String useCompressedOops; private final String useG1GC; private final String useSerialGC; + private final long g1RegionSize; private JvmInfo(long pid, String version, String vmName, String vmVersion, String vmVendor, boolean bundledJdk, Boolean usingBundledJdk, long startTime, long configuredInitialHeapSize, long configuredMaxHeapSize, Mem mem, String[] inputArguments, String bootClassPath, String classPath, Map systemProperties, String[] gcCollectors, String[] memoryPools, String onError, String onOutOfMemoryError, String useCompressedOops, String useG1GC, - String useSerialGC) { + String useSerialGC, long g1RegionSize) { this.pid = pid; this.version = version; this.vmName = vmName; @@ -238,6 +244,7 @@ public class JvmInfo implements ReportingService.Info { this.useCompressedOops = useCompressedOops; this.useG1GC = useG1GC; this.useSerialGC = useSerialGC; + this.g1RegionSize = g1RegionSize; } public JvmInfo(StreamInput in) throws IOException { @@ -272,6 +279,7 @@ public class JvmInfo implements ReportingService.Info { this.onOutOfMemoryError = null; this.useG1GC = "unknown"; this.useSerialGC = "unknown"; + this.g1RegionSize = -1; } @Override @@ -467,6 +475,10 @@ public class JvmInfo implements ReportingService.Info { return this.useSerialGC; } + public long getG1RegionSize() { + return g1RegionSize; + } + public String[] getGcCollectors() { return gcCollectors; } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java index 6a303449ce1..e7ebbbf102e 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java @@ -20,13 +20,25 @@ package org.elasticsearch.common.util.concurrent; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; public class ReleasableLockTests extends ESTestCase { @@ -79,10 +91,10 @@ public class ReleasableLockTests extends ESTestCase { } private void acquire(final ReleasableLock lockToAcquire, final ReleasableLock otherLock) { - try (@SuppressWarnings("unused") Releasable outer = lockToAcquire.acquire()) { + try (@SuppressWarnings("unused") Releasable outer = randomAcquireMethod(lockToAcquire)) { assertTrue(lockToAcquire.isHeldByCurrentThread()); assertFalse(otherLock.isHeldByCurrentThread()); - try (@SuppressWarnings("unused") Releasable inner = lockToAcquire.acquire()) { + try (@SuppressWarnings("unused") Releasable inner = randomAcquireMethod(lockToAcquire)) { assertTrue(lockToAcquire.isHeldByCurrentThread()); assertFalse(otherLock.isHeldByCurrentThread()); } @@ -94,4 +106,56 @@ public class ReleasableLockTests extends ESTestCase { assertFalse(otherLock.isHeldByCurrentThread()); } + private ReleasableLock randomAcquireMethod(ReleasableLock lock) { + if (randomBoolean()) { + return lock.acquire(); + } else { + try { + ReleasableLock releasableLock = lock.tryAcquire(TimeValue.timeValueSeconds(30)); + assertThat(releasableLock, notNullValue()); + return releasableLock; + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + } + + public void testTryAcquire() throws Exception { + ReleasableLock lock = new ReleasableLock(new ReentrantLock()); + int numberOfThreads = randomIntBetween(1, 10); + CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); + AtomicInteger lockedCounter = new AtomicInteger(); + int timeout = randomFrom(0, 5, 10); + List threads = + IntStream.range(0, numberOfThreads).mapToObj(i -> new Thread(() -> { + try { + barrier.await(10, TimeUnit.SECONDS); + try (ReleasableLock locked = lock.tryAcquire(TimeValue.timeValueMillis(timeout))) { + if (locked != null) { + lockedCounter.incrementAndGet(); + } + } + } catch (InterruptedException | BrokenBarrierException | TimeoutException e) { + throw new AssertionError(e); + } + })).collect(Collectors.toList()); + threads.forEach(Thread::start); + try (ReleasableLock locked = randomBoolean() ? lock.acquire() : null) { + barrier.await(10, TimeUnit.SECONDS); + for (Thread thread : threads) { + thread.join(10000); + } + threads.forEach(t -> assertThat(t.isAlive(), is(false))); + + if (locked != null) { + assertThat(lockedCounter.get(), equalTo(0)); + } else { + assertThat(lockedCounter.get(), greaterThanOrEqualTo(1)); + } + } + + try (ReleasableLock locked = lock.tryAcquire(TimeValue.ZERO)) { + assertThat(locked, notNullValue()); + } + } } diff --git a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java index 9bcfcf25cff..eaf5d693131 100644 --- a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java @@ -27,22 +27,38 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.test.ESTestCase; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.Arrays; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.LongSupplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; public class HierarchyCircuitBreakerServiceTests extends ESTestCase { @@ -270,6 +286,337 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { assertEquals(0, requestBreaker.getTrippedCount()); } + /** + * "Integration test" checking that we ask the G1 over limit check before parent breaking. + * Given that it depends on GC, the main assertion that we do not get a circuit breaking exception in the threads towards + * the end of the test is not enabled. The following tests checks this in more unit test style. + */ + public void testParentTriggersG1GCBeforeBreaking() throws InterruptedException, TimeoutException, BrokenBarrierException { + assumeTrue("Only G1GC can utilize the over limit check", JvmInfo.jvmInfo().useG1GC().equals("true")); + long g1RegionSize = JvmInfo.jvmInfo().getG1RegionSize(); + assumeTrue("Must have region size", g1RegionSize > 0); + + Settings clusterSettings = Settings.builder() + .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), Boolean.TRUE) + .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "50%") + .build(); + + AtomicInteger leaderTriggerCount = new AtomicInteger(); + AtomicReference> onOverLimit = new AtomicReference<>(leader -> {}); + AtomicLong time = new AtomicLong(randomLongBetween(Long.MIN_VALUE/2, Long.MAX_VALUE/2)); + long interval = randomLongBetween(1, 1000); + final HierarchyCircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings, + Collections.emptyList(), + new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + trackRealMemoryUsage -> new HierarchyCircuitBreakerService.G1OverLimitStrategy(JvmInfo.jvmInfo(), + HierarchyCircuitBreakerService::realMemoryUsage, + HierarchyCircuitBreakerService.createYoungGcCountSupplier(), time::get, interval, TimeValue.timeValueSeconds(30)) { + + @Override + void overLimitTriggered(boolean leader) { + if (leader) { + leaderTriggerCount.incrementAndGet(); + } + onOverLimit.get().accept(leader); + } + }); + + long maxHeap = JvmInfo.jvmInfo().getConfiguredMaxHeapSize(); + int regionCount = Math.toIntExact((maxHeap / 2 + g1RegionSize - 1) / g1RegionSize); + + // First setup a host of large byte[]'s, must be Humongous objects since those are cleaned during a young phase (no concurrent cycle + // necessary, which is hard to control in the test). + List data = new ArrayList<>(); + for (int i = 0; i < regionCount; ++i) { + data.add(new byte[(int) (JvmInfo.jvmInfo().getG1RegionSize() / 2)]); + } + try { + service.checkParentLimit(0, "test"); + fail("must exceed memory limit"); + } catch (CircuitBreakingException e) { + // OK + } + + time.addAndGet(randomLongBetween(interval, interval + 10)); + onOverLimit.set(leader -> { + if (leader) { + data.clear(); + } + }); + + logger.trace("black hole [{}]", data.hashCode()); + + int threadCount = randomIntBetween(1, 10); + CyclicBarrier barrier = new CyclicBarrier(threadCount + 1); + List threads = new ArrayList<>(threadCount); + for (int i = 0; i < threadCount; ++i) { + threads.add( + new Thread(() -> { + try { + barrier.await(10, TimeUnit.SECONDS); + service.checkParentLimit(0, "test-thread"); + } catch (InterruptedException | BrokenBarrierException | TimeoutException e) { + throw new AssertionError(e); + } catch (CircuitBreakingException e) { + // very rare + logger.info("Thread got semi-unexpected circuit breaking exception", e); + } + })); + } + + threads.forEach(Thread::start); + barrier.await(20, TimeUnit.SECONDS); + + for (Thread thread : threads) { + thread.join(10000); + } + threads.forEach(thread -> assertFalse(thread.isAlive())); + + assertThat(leaderTriggerCount.get(), equalTo(2)); + } + + public void testParentDoesOverLimitCheck() { + long g1RegionSize = JvmInfo.jvmInfo().getG1RegionSize(); + + Settings clusterSettings = Settings.builder() + .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), Boolean.TRUE) + .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "50%") + .build(); + boolean saveTheDay = randomBoolean(); + AtomicBoolean overLimitTriggered = new AtomicBoolean(); + final HierarchyCircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings, + Collections.emptyList(), + new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + trackRealMemoryUsage -> + memoryUsed -> { + assertTrue(overLimitTriggered.compareAndSet(false, true)); + if (saveTheDay) { + return new HierarchyCircuitBreakerService.MemoryUsage(memoryUsed.baseUsage / 2, + memoryUsed.totalUsage - (memoryUsed.baseUsage / 2), memoryUsed.transientChildUsage, + memoryUsed.permanentChildUsage); + } else { + return memoryUsed; + } + } + ); + + int allocationSize = g1RegionSize > 0 ? (int) (g1RegionSize / 2) : 1024 * 1024; + int allocationCount = (int) (JvmInfo.jvmInfo().getConfiguredMaxHeapSize() / allocationSize) + 1; + List data = new ArrayList<>(); + try { + for (int i = 0 ; i < allocationCount && overLimitTriggered.get() == false; ++i) { + data.add(new byte[allocationSize]); + service.checkParentLimit(0, "test"); + } + assertTrue(saveTheDay); + } catch (CircuitBreakingException e) { + assertFalse(saveTheDay); + } + + logger.trace("black hole [{}]", data.hashCode()); + } + + public void testFallbackG1RegionSize() { + assumeTrue("Only G1GC can utilize the over limit check", JvmInfo.jvmInfo().useG1GC().equals("true")); + assumeTrue("Must have region size", JvmInfo.jvmInfo().getG1RegionSize() > 0); + + assertThat(HierarchyCircuitBreakerService.G1OverLimitStrategy.fallbackRegionSize(JvmInfo.jvmInfo()), + equalTo(JvmInfo.jvmInfo().getG1RegionSize())); + } + + public void testG1OverLimitStrategyBreakOnMemory() { + AtomicLong time = new AtomicLong(randomLongBetween(Long.MIN_VALUE/2, Long.MAX_VALUE/2)); + AtomicInteger leaderTriggerCount = new AtomicInteger(); + AtomicInteger nonLeaderTriggerCount = new AtomicInteger(); + long interval = randomLongBetween(1, 1000); + AtomicLong memoryUsage = new AtomicLong(); + + HierarchyCircuitBreakerService.G1OverLimitStrategy strategy = + new HierarchyCircuitBreakerService.G1OverLimitStrategy(JvmInfo.jvmInfo(), memoryUsage::get, () -> 0, time::get, interval, + TimeValue.timeValueSeconds(30)) { + @Override + void overLimitTriggered(boolean leader) { + if (leader) { + leaderTriggerCount.incrementAndGet(); + } else { + nonLeaderTriggerCount.incrementAndGet(); + } + } + }; + memoryUsage.set(randomLongBetween(100, 110)); + HierarchyCircuitBreakerService.MemoryUsage input = new HierarchyCircuitBreakerService.MemoryUsage(100, randomLongBetween(100, 110), + randomLongBetween(0, 50), + randomLongBetween(0, 50)); + + assertThat(strategy.overLimit(input), sameInstance(input)); + assertThat(leaderTriggerCount.get(), equalTo(1)); + + memoryUsage.set(99); + HierarchyCircuitBreakerService.MemoryUsage output = strategy.overLimit(input); + assertThat(output, not(sameInstance(input))); + assertThat(output.baseUsage, equalTo(memoryUsage.get())); + assertThat(output.totalUsage, equalTo(99 + input.totalUsage - 100)); + assertThat(output.transientChildUsage, equalTo(input.transientChildUsage)); + assertThat(output.permanentChildUsage, equalTo(input.permanentChildUsage)); + assertThat(nonLeaderTriggerCount.get(), equalTo(1)); + + time.addAndGet(randomLongBetween(interval, interval * 2)); + output = strategy.overLimit(input); + assertThat(output, not(sameInstance(input))); + assertThat(output.baseUsage, equalTo(memoryUsage.get())); + assertThat(output.totalUsage, equalTo(99 + input.totalUsage - 100)); + assertThat(output.transientChildUsage, equalTo(input.transientChildUsage)); + assertThat(output.permanentChildUsage, equalTo(input.permanentChildUsage)); + assertThat(leaderTriggerCount.get(), equalTo(2)); + } + + public void testG1OverLimitStrategyBreakOnGcCount() { + AtomicLong time = new AtomicLong(randomLongBetween(Long.MIN_VALUE/2, Long.MAX_VALUE/2)); + AtomicInteger leaderTriggerCount = new AtomicInteger(); + AtomicInteger nonLeaderTriggerCount = new AtomicInteger(); + long interval = randomLongBetween(1, 1000); + AtomicLong memoryUsageCounter = new AtomicLong(); + AtomicLong gcCounter = new AtomicLong(); + LongSupplier memoryUsageSupplier = () -> { + memoryUsageCounter.incrementAndGet(); + return randomLongBetween(100, 110); + }; + HierarchyCircuitBreakerService.G1OverLimitStrategy strategy = + new HierarchyCircuitBreakerService.G1OverLimitStrategy(JvmInfo.jvmInfo(), + memoryUsageSupplier, + gcCounter::incrementAndGet, + time::get, interval, TimeValue.timeValueSeconds(30)) { + + @Override + void overLimitTriggered(boolean leader) { + if (leader) { + leaderTriggerCount.incrementAndGet(); + } else { + nonLeaderTriggerCount.incrementAndGet(); + } + } + }; + HierarchyCircuitBreakerService.MemoryUsage input = new HierarchyCircuitBreakerService.MemoryUsage(100, randomLongBetween(100, 110), + randomLongBetween(0, 50), + randomLongBetween(0, 50)); + + assertThat(strategy.overLimit(input), sameInstance(input)); + assertThat(leaderTriggerCount.get(), equalTo(1)); + assertThat(gcCounter.get(), equalTo(2L)); + assertThat(memoryUsageCounter.get(), equalTo(2L)); // 1 before gc count break and 1 to get resulting memory usage. + } + + public void testG1OverLimitStrategyThrottling() throws InterruptedException, BrokenBarrierException, TimeoutException { + AtomicLong time = new AtomicLong(randomLongBetween(Long.MIN_VALUE/2, Long.MAX_VALUE/2)); + AtomicInteger leaderTriggerCount = new AtomicInteger(); + long interval = randomLongBetween(1, 1000); + AtomicLong memoryUsage = new AtomicLong(); + HierarchyCircuitBreakerService.G1OverLimitStrategy strategy = + new HierarchyCircuitBreakerService.G1OverLimitStrategy(JvmInfo.jvmInfo(), memoryUsage::get, () -> 0, + time::get, interval, TimeValue.timeValueSeconds(30)) { + + @Override + void overLimitTriggered(boolean leader) { + if (leader) { + leaderTriggerCount.incrementAndGet(); + } + } + }; + + int threadCount = randomIntBetween(1, 10); + CyclicBarrier barrier = new CyclicBarrier(threadCount + 1); + AtomicReference countDown = new AtomicReference<>(new CountDownLatch(randomIntBetween(1, 20))); + List threads = IntStream.range(0, threadCount) + .mapToObj(i -> new Thread(() -> { + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (InterruptedException | BrokenBarrierException | TimeoutException e) { + throw new AssertionError(e); + } + do { + HierarchyCircuitBreakerService.MemoryUsage input = + new HierarchyCircuitBreakerService.MemoryUsage(randomLongBetween(0, 100), randomLongBetween(0, 100), + randomLongBetween(0, 100), randomLongBetween(0, 100)); + HierarchyCircuitBreakerService.MemoryUsage output = strategy.overLimit(input); + assertThat(output.totalUsage, equalTo(output.baseUsage + input.totalUsage - input.baseUsage)); + assertThat(output.transientChildUsage, equalTo(input.transientChildUsage)); + assertThat(output.permanentChildUsage, equalTo(input.permanentChildUsage)); + countDown.get().countDown(); + } while (!Thread.interrupted()); + })).collect(Collectors.toList()); + + threads.forEach(Thread::start); + barrier.await(20, TimeUnit.SECONDS); + + int iterationCount = randomIntBetween(1, 5); + for (int i = 0; i < iterationCount; ++i) { + memoryUsage.set(randomLongBetween(0, 100)); + assertTrue(countDown.get().await(20, TimeUnit.SECONDS)); + assertThat(leaderTriggerCount.get(), lessThanOrEqualTo(i + 1)); + assertThat(leaderTriggerCount.get(), greaterThanOrEqualTo(i / 2 + 1)); + time.addAndGet(randomLongBetween(interval, interval * 2)); + countDown.set(new CountDownLatch(randomIntBetween(1, 20))); + } + + threads.forEach(Thread::interrupt); + for (Thread thread : threads) { + thread.join(10000); + } + threads.forEach(thread -> assertFalse(thread.isAlive())); + } + + public void testCreateOverLimitStrategy() { + assertThat(HierarchyCircuitBreakerService.createOverLimitStrategy(false), + not(instanceOf(HierarchyCircuitBreakerService.G1OverLimitStrategy.class))); + HierarchyCircuitBreakerService.OverLimitStrategy overLimitStrategy = HierarchyCircuitBreakerService.createOverLimitStrategy(true); + if (JvmInfo.jvmInfo().useG1GC().equals("true")) { + assertThat(overLimitStrategy, instanceOf(HierarchyCircuitBreakerService.G1OverLimitStrategy.class)); + assertThat(((HierarchyCircuitBreakerService.G1OverLimitStrategy) overLimitStrategy).getLockTimeout(), + equalTo(TimeValue.timeValueMillis(500))); + } else { + assertThat(overLimitStrategy, not(instanceOf(HierarchyCircuitBreakerService.G1OverLimitStrategy.class))); + } + } + + public void testG1LockTimeout() throws Exception { + CountDownLatch startedBlocking = new CountDownLatch(1); + CountDownLatch blockingUntil = new CountDownLatch(1); + AtomicLong gcCounter = new AtomicLong(); + HierarchyCircuitBreakerService.G1OverLimitStrategy strategy = + new HierarchyCircuitBreakerService.G1OverLimitStrategy(JvmInfo.jvmInfo(), () -> 100, gcCounter::incrementAndGet, + () -> 0, 1, TimeValue.timeValueMillis(randomFrom(0, 5, 10))) { + + @Override + void overLimitTriggered(boolean leader) { + if (leader) { + startedBlocking.countDown(); + try { + // this is the central assertion - the overLimit call below should complete in a timely manner. + assertThat(blockingUntil.await(10, TimeUnit.SECONDS), is(true)); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + } + }; + + HierarchyCircuitBreakerService.MemoryUsage input = new HierarchyCircuitBreakerService.MemoryUsage(100, 100, 0, 0); + Thread blocker = new Thread(() -> { + strategy.overLimit(input); + }); + blocker.start(); + try { + assertThat(startedBlocking.await(10, TimeUnit.SECONDS), is(true)); + + // this should complete in a timely manner, verified by the assertion in the thread. + assertThat(strategy.overLimit(input), sameInstance(input)); + } finally { + blockingUntil.countDown(); + blocker.join(10000); + assertThat(blocker.isAlive(), is(false)); + } + } + public void testTrippedCircuitBreakerDurability() { Settings clusterSettings = Settings.builder() .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), Boolean.FALSE)