Enhance real memory circuit breaker with G1 GC (#58674) (#59394)

Using G1 GC, Elasticsearch can rarely trigger that heap usage goes above
the real memory circuit breaker limit and stays there for an extended
period. This situation will persist until the next young GC. The circuit
breaking itself hinders that from occurring in a timely manner since it
breaks all request before real work is done.

This commit gently nudges G1 to do a young GC and then double checks
that heap usage is still above the real memory circuit breaker limit
before throwing the circuit breaker exception.

Related to #57202
This commit is contained in:
Henning Andersen 2020-07-13 17:41:09 +02:00 committed by GitHub
parent b1b7bf3912
commit adf6083dd0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 625 additions and 6 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.Assertions; import org.elasticsearch.Assertions;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineException;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -57,6 +58,19 @@ public class ReleasableLock implements Releasable {
return this; return this;
} }
/**
* Try acquiring lock, returning null if unable to acquire lock within timeout.
*/
public ReleasableLock tryAcquire(TimeValue timeout) throws InterruptedException {
boolean locked = lock.tryLock(timeout.duration(), timeout.timeUnit());
if (locked) {
assert addCurrentThread();
return this;
} else {
return null;
}
}
private boolean addCurrentThread() { private boolean addCurrentThread() {
final Integer current = holdingThreads.get(); final Integer current = holdingThreads.get();
holdingThreads.set(current == null ? 1 : current + 1); holdingThreads.set(current == null ? 1 : current + 1);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.indices.breaker;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker; import org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.breaker.CircuitBreakingException;
@ -30,8 +31,14 @@ import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.monitor.jvm.GcNames;
import org.elasticsearch.monitor.jvm.JvmInfo;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean; import java.lang.management.MemoryMXBean;
import java.util.ArrayList; import java.util.ArrayList;
@ -40,6 +47,9 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.elasticsearch.indices.breaker.BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING; import static org.elasticsearch.indices.breaker.BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING;
@ -104,7 +114,14 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
// Tripped count for when redistribution was attempted but wasn't successful // Tripped count for when redistribution was attempted but wasn't successful
private final AtomicLong parentTripCount = new AtomicLong(0); private final AtomicLong parentTripCount = new AtomicLong(0);
private final OverLimitStrategy overLimitStrategy;
public HierarchyCircuitBreakerService(Settings settings, List<BreakerSettings> customBreakers, ClusterSettings clusterSettings) { public HierarchyCircuitBreakerService(Settings settings, List<BreakerSettings> customBreakers, ClusterSettings clusterSettings) {
this(settings, customBreakers, clusterSettings, HierarchyCircuitBreakerService::createOverLimitStrategy);
}
HierarchyCircuitBreakerService(Settings settings, List<BreakerSettings> customBreakers, ClusterSettings clusterSettings,
Function<Boolean, OverLimitStrategy> overLimitStrategyFactory) {
super(); super();
HashMap<String, CircuitBreaker> childCircuitBreakers = new HashMap<>(); HashMap<String, CircuitBreaker> childCircuitBreakers = new HashMap<>();
childCircuitBreakers.put(CircuitBreaker.FIELDDATA, validateAndCreateBreaker( childCircuitBreakers.put(CircuitBreaker.FIELDDATA, validateAndCreateBreaker(
@ -168,6 +185,8 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
CIRCUIT_BREAKER_OVERHEAD_SETTING, CIRCUIT_BREAKER_OVERHEAD_SETTING,
(name, updatedValues) -> updateCircuitBreakerSettings(name, updatedValues.v1(), updatedValues.v2()), (name, updatedValues) -> updateCircuitBreakerSettings(name, updatedValues.v1(), updatedValues.v2()),
(s, t) -> {}); (s, t) -> {});
this.overLimitStrategy = overLimitStrategyFactory.apply(this.trackRealMemoryUsage);
} }
private void updateCircuitBreakerSettings(String name, ByteSizeValue newLimit, Double newOverhead) { private void updateCircuitBreakerSettings(String name, ByteSizeValue newLimit, Double newOverhead) {
@ -231,7 +250,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
breaker.getTrippedCount()); breaker.getTrippedCount());
} }
private static class MemoryUsage { static class MemoryUsage {
final long baseUsage; final long baseUsage;
final long totalUsage; final long totalUsage;
final long transientChildUsage; final long transientChildUsage;
@ -268,6 +287,10 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
//package private to allow overriding it in tests //package private to allow overriding it in tests
long currentMemoryUsage() { long currentMemoryUsage() {
return realMemoryUsage();
}
static long realMemoryUsage() {
try { try {
return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed(); return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
} catch (IllegalArgumentException ex) { } catch (IllegalArgumentException ex) {
@ -290,7 +313,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException { public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException {
final MemoryUsage memoryUsed = memoryUsed(newBytesReserved); final MemoryUsage memoryUsed = memoryUsed(newBytesReserved);
long parentLimit = this.parentSettings.getLimit(); long parentLimit = this.parentSettings.getLimit();
if (memoryUsed.totalUsage > parentLimit) { if (memoryUsed.totalUsage > parentLimit && overLimitStrategy.overLimit(memoryUsed).totalUsage > parentLimit) {
this.parentTripCount.incrementAndGet(); this.parentTripCount.incrementAndGet();
final StringBuilder message = new StringBuilder("[parent] Data too large, data for [" + label + "]" + final StringBuilder message = new StringBuilder("[parent] Data too large, data for [" + label + "]" +
" would be [" + memoryUsed.totalUsage + "/" + new ByteSizeValue(memoryUsed.totalUsage) + "]" + " would be [" + memoryUsed.totalUsage + "/" + new ByteSizeValue(memoryUsed.totalUsage) + "]" +
@ -334,4 +357,163 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
this, this,
breakerSettings.getName()); breakerSettings.getName());
} }
static OverLimitStrategy createOverLimitStrategy(boolean trackRealMemoryUsage) {
JvmInfo jvmInfo = JvmInfo.jvmInfo();
if (trackRealMemoryUsage && jvmInfo.useG1GC().equals("true")
// messing with GC is "dangerous" so we apply an escape hatch. Not intended to be used.
&& Booleans.parseBoolean(System.getProperty("es.real_memory_circuit_breaker.g1_over_limit_strategy.enabled"), true)) {
TimeValue lockTimeout = TimeValue.timeValueMillis(
Integer.parseInt(System.getProperty("es.real_memory_circuit_breaker.g1_over_limit_strategy.lock_timeout_ms", "500"))
);
// hardcode interval, do not want any tuning of it outside code changes.
return new G1OverLimitStrategy(jvmInfo, HierarchyCircuitBreakerService::realMemoryUsage, createYoungGcCountSupplier(),
System::currentTimeMillis, 5000, lockTimeout);
} else {
return memoryUsed -> memoryUsed;
}
}
static LongSupplier createYoungGcCountSupplier() {
List<GarbageCollectorMXBean> youngBeans =
ManagementFactory.getGarbageCollectorMXBeans().stream()
.filter(mxBean -> GcNames.getByGcName(mxBean.getName(), mxBean.getName()).equals(GcNames.YOUNG))
.collect(Collectors.toList());
assert youngBeans.size() == 1;
assert youngBeans.get(0).getCollectionCount() != -1 : "G1 must support getting collection count";
if (youngBeans.size() == 1) {
return youngBeans.get(0)::getCollectionCount;
} else {
logger.warn("Unable to find young generation collector, G1 over limit strategy might be impacted [{}]", youngBeans);
return () -> -1;
}
}
interface OverLimitStrategy {
MemoryUsage overLimit(MemoryUsage memoryUsed);
}
static class G1OverLimitStrategy implements OverLimitStrategy {
private final long g1RegionSize;
private final LongSupplier currentMemoryUsageSupplier;
private final LongSupplier gcCountSupplier;
private final LongSupplier timeSupplier;
private final TimeValue lockTimeout;
private final long maxHeap;
private long lastCheckTime = Long.MIN_VALUE;
private final long minimumInterval;
private long blackHole;
private final ReleasableLock lock = new ReleasableLock(new ReentrantLock());
G1OverLimitStrategy(JvmInfo jvmInfo, LongSupplier currentMemoryUsageSupplier,
LongSupplier gcCountSupplier,
LongSupplier timeSupplier, long minimumInterval, TimeValue lockTimeout) {
this.lockTimeout = lockTimeout;
assert minimumInterval > 0;
this.currentMemoryUsageSupplier = currentMemoryUsageSupplier;
this.gcCountSupplier = gcCountSupplier;
this.timeSupplier = timeSupplier;
this.minimumInterval = minimumInterval;
this.maxHeap = jvmInfo.getMem().getHeapMax().getBytes();
long g1RegionSize = jvmInfo.getG1RegionSize();
if (g1RegionSize <= 0) {
this.g1RegionSize = fallbackRegionSize(jvmInfo);
} else {
this.g1RegionSize = g1RegionSize;
}
}
static long fallbackRegionSize(JvmInfo jvmInfo) {
// mimick JDK calculation based on JDK 14 source:
// https://hg.openjdk.java.net/jdk/jdk14/file/6c954123ee8d/src/hotspot/share/gc/g1/heapRegion.cpp#l65
// notice that newer JDKs will have a slight variant only considering max-heap:
// https://hg.openjdk.java.net/jdk/jdk/file/e7d0ec2d06e8/src/hotspot/share/gc/g1/heapRegion.cpp#l67
// based on this JDK "bug":
// https://bugs.openjdk.java.net/browse/JDK-8241670
long averageHeapSize =
(jvmInfo.getMem().getHeapMax().getBytes() + JvmInfo.jvmInfo().getMem().getHeapMax().getBytes()) / 2;
long regionSize = Long.highestOneBit(averageHeapSize / 2048);
if (regionSize < ByteSizeUnit.MB.toBytes(1)) {
regionSize = ByteSizeUnit.MB.toBytes(1);
} else if (regionSize > ByteSizeUnit.MB.toBytes(32)) {
regionSize = ByteSizeUnit.MB.toBytes(32);
}
return regionSize;
}
@Override
public MemoryUsage overLimit(MemoryUsage memoryUsed) {
boolean leader = false;
int allocationIndex = 0;
long allocationDuration = 0;
try (ReleasableLock locked = lock.tryAcquire(lockTimeout)) {
if (locked != null) {
long begin = timeSupplier.getAsLong();
leader = begin >= lastCheckTime + minimumInterval;
overLimitTriggered(leader);
if (leader) {
long initialCollectionCount = gcCountSupplier.getAsLong();
logger.info("attempting to trigger G1GC due to high heap usage [{}]", memoryUsed.baseUsage);
long localBlackHole = 0;
// number of allocations, corresponding to (approximately) number of free regions + 1
int allocationCount = Math.toIntExact((maxHeap - memoryUsed.baseUsage) / g1RegionSize + 1);
// allocations of half-region size becomes single humongous alloc, thus taking up a full region.
int allocationSize = (int) (g1RegionSize >> 1);
long maxUsageObserved = memoryUsed.baseUsage;
for (; allocationIndex < allocationCount; ++allocationIndex) {
long current = currentMemoryUsageSupplier.getAsLong();
if (current >= maxUsageObserved) {
maxUsageObserved = current;
} else {
// we observed a memory drop, so some GC must have occurred
break;
}
if (initialCollectionCount != gcCountSupplier.getAsLong()) {
break;
}
localBlackHole += new byte[allocationSize].hashCode();
}
blackHole += localBlackHole;
logger.trace("black hole [{}]", blackHole);
long now = timeSupplier.getAsLong();
this.lastCheckTime = now;
allocationDuration = now - begin;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// fallthrough
}
final long current = currentMemoryUsageSupplier.getAsLong();
if (current < memoryUsed.baseUsage) {
if (leader) {
logger.info("GC did bring memory usage down, before [{}], after [{}], allocations [{}], duration [{}]",
memoryUsed.baseUsage, current, allocationIndex, allocationDuration);
}
return new MemoryUsage(current, memoryUsed.totalUsage - memoryUsed.baseUsage + current,
memoryUsed.transientChildUsage, memoryUsed.permanentChildUsage);
} else {
if (leader) {
logger.info("GC did not bring memory usage down, before [{}], after [{}], allocations [{}], duration [{}]",
memoryUsed.baseUsage, current, allocationIndex, allocationDuration);
}
// prefer original measurement when reporting if heap usage was not brought down.
return memoryUsed;
}
}
void overLimitTriggered(boolean leader) {
// for tests to override.
}
TimeValue getLockTimeout() {
return lockTimeout;
}
}
} }

View File

@ -98,6 +98,7 @@ public class JvmInfo implements ReportingService.Info {
String onOutOfMemoryError = null; String onOutOfMemoryError = null;
String useCompressedOops = "unknown"; String useCompressedOops = "unknown";
String useG1GC = "unknown"; String useG1GC = "unknown";
long g1RegisionSize = -1;
String useSerialGC = "unknown"; String useSerialGC = "unknown";
long configuredInitialHeapSize = -1; long configuredInitialHeapSize = -1;
long configuredMaxHeapSize = -1; long configuredMaxHeapSize = -1;
@ -130,6 +131,8 @@ public class JvmInfo implements ReportingService.Info {
try { try {
Object useG1GCVmOptionObject = vmOptionMethod.invoke(hotSpotDiagnosticMXBean, "UseG1GC"); Object useG1GCVmOptionObject = vmOptionMethod.invoke(hotSpotDiagnosticMXBean, "UseG1GC");
useG1GC = (String) valueMethod.invoke(useG1GCVmOptionObject); useG1GC = (String) valueMethod.invoke(useG1GCVmOptionObject);
Object regionSizeVmOptionObject = vmOptionMethod.invoke(hotSpotDiagnosticMXBean, "G1HeapRegionSize");
g1RegisionSize = Long.parseLong((String) valueMethod.invoke(regionSizeVmOptionObject));
} catch (Exception ignored) { } catch (Exception ignored) {
} }
@ -161,9 +164,11 @@ public class JvmInfo implements ReportingService.Info {
INSTANCE = new JvmInfo(JvmPid.getPid(), System.getProperty("java.version"), runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), INSTANCE = new JvmInfo(JvmPid.getPid(), System.getProperty("java.version"), runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(),
runtimeMXBean.getVmVendor(), bundledJdk, usingBundledJdk, runtimeMXBean.getStartTime(), configuredInitialHeapSize, runtimeMXBean.getVmVendor(), bundledJdk, usingBundledJdk, runtimeMXBean.getStartTime(), configuredInitialHeapSize,
configuredMaxHeapSize, mem, inputArguments, bootClassPath, classPath, systemProperties, gcCollectors, memoryPools, onError, configuredMaxHeapSize, mem, inputArguments, bootClassPath, classPath, systemProperties, gcCollectors, memoryPools, onError,
onOutOfMemoryError, useCompressedOops, useG1GC, useSerialGC); onOutOfMemoryError, useCompressedOops, useG1GC, useSerialGC,
g1RegisionSize);
} }
@SuppressForbidden(reason = "PathUtils#get") @SuppressForbidden(reason = "PathUtils#get")
private static boolean usingBundledJdk() { private static boolean usingBundledJdk() {
/* /*
@ -210,12 +215,13 @@ public class JvmInfo implements ReportingService.Info {
private final String useCompressedOops; private final String useCompressedOops;
private final String useG1GC; private final String useG1GC;
private final String useSerialGC; private final String useSerialGC;
private final long g1RegionSize;
private JvmInfo(long pid, String version, String vmName, String vmVersion, String vmVendor, boolean bundledJdk, Boolean usingBundledJdk, private JvmInfo(long pid, String version, String vmName, String vmVersion, String vmVendor, boolean bundledJdk, Boolean usingBundledJdk,
long startTime, long configuredInitialHeapSize, long configuredMaxHeapSize, Mem mem, String[] inputArguments, long startTime, long configuredInitialHeapSize, long configuredMaxHeapSize, Mem mem, String[] inputArguments,
String bootClassPath, String classPath, Map<String, String> systemProperties, String[] gcCollectors, String bootClassPath, String classPath, Map<String, String> systemProperties, String[] gcCollectors,
String[] memoryPools, String onError, String onOutOfMemoryError, String useCompressedOops, String useG1GC, String[] memoryPools, String onError, String onOutOfMemoryError, String useCompressedOops, String useG1GC,
String useSerialGC) { String useSerialGC, long g1RegionSize) {
this.pid = pid; this.pid = pid;
this.version = version; this.version = version;
this.vmName = vmName; this.vmName = vmName;
@ -238,6 +244,7 @@ public class JvmInfo implements ReportingService.Info {
this.useCompressedOops = useCompressedOops; this.useCompressedOops = useCompressedOops;
this.useG1GC = useG1GC; this.useG1GC = useG1GC;
this.useSerialGC = useSerialGC; this.useSerialGC = useSerialGC;
this.g1RegionSize = g1RegionSize;
} }
public JvmInfo(StreamInput in) throws IOException { public JvmInfo(StreamInput in) throws IOException {
@ -272,6 +279,7 @@ public class JvmInfo implements ReportingService.Info {
this.onOutOfMemoryError = null; this.onOutOfMemoryError = null;
this.useG1GC = "unknown"; this.useG1GC = "unknown";
this.useSerialGC = "unknown"; this.useSerialGC = "unknown";
this.g1RegionSize = -1;
} }
@Override @Override
@ -467,6 +475,10 @@ public class JvmInfo implements ReportingService.Info {
return this.useSerialGC; return this.useSerialGC;
} }
public long getG1RegionSize() {
return g1RegionSize;
}
public String[] getGcCollectors() { public String[] getGcCollectors() {
return gcCollectors; return gcCollectors;
} }

View File

@ -20,13 +20,25 @@
package org.elasticsearch.common.util.concurrent; package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
public class ReleasableLockTests extends ESTestCase { public class ReleasableLockTests extends ESTestCase {
@ -79,10 +91,10 @@ public class ReleasableLockTests extends ESTestCase {
} }
private void acquire(final ReleasableLock lockToAcquire, final ReleasableLock otherLock) { private void acquire(final ReleasableLock lockToAcquire, final ReleasableLock otherLock) {
try (@SuppressWarnings("unused") Releasable outer = lockToAcquire.acquire()) { try (@SuppressWarnings("unused") Releasable outer = randomAcquireMethod(lockToAcquire)) {
assertTrue(lockToAcquire.isHeldByCurrentThread()); assertTrue(lockToAcquire.isHeldByCurrentThread());
assertFalse(otherLock.isHeldByCurrentThread()); assertFalse(otherLock.isHeldByCurrentThread());
try (@SuppressWarnings("unused") Releasable inner = lockToAcquire.acquire()) { try (@SuppressWarnings("unused") Releasable inner = randomAcquireMethod(lockToAcquire)) {
assertTrue(lockToAcquire.isHeldByCurrentThread()); assertTrue(lockToAcquire.isHeldByCurrentThread());
assertFalse(otherLock.isHeldByCurrentThread()); assertFalse(otherLock.isHeldByCurrentThread());
} }
@ -94,4 +106,56 @@ public class ReleasableLockTests extends ESTestCase {
assertFalse(otherLock.isHeldByCurrentThread()); assertFalse(otherLock.isHeldByCurrentThread());
} }
private ReleasableLock randomAcquireMethod(ReleasableLock lock) {
if (randomBoolean()) {
return lock.acquire();
} else {
try {
ReleasableLock releasableLock = lock.tryAcquire(TimeValue.timeValueSeconds(30));
assertThat(releasableLock, notNullValue());
return releasableLock;
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
}
public void testTryAcquire() throws Exception {
ReleasableLock lock = new ReleasableLock(new ReentrantLock());
int numberOfThreads = randomIntBetween(1, 10);
CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
AtomicInteger lockedCounter = new AtomicInteger();
int timeout = randomFrom(0, 5, 10);
List<Thread> threads =
IntStream.range(0, numberOfThreads).mapToObj(i -> new Thread(() -> {
try {
barrier.await(10, TimeUnit.SECONDS);
try (ReleasableLock locked = lock.tryAcquire(TimeValue.timeValueMillis(timeout))) {
if (locked != null) {
lockedCounter.incrementAndGet();
}
}
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
throw new AssertionError(e);
}
})).collect(Collectors.toList());
threads.forEach(Thread::start);
try (ReleasableLock locked = randomBoolean() ? lock.acquire() : null) {
barrier.await(10, TimeUnit.SECONDS);
for (Thread thread : threads) {
thread.join(10000);
}
threads.forEach(t -> assertThat(t.isAlive(), is(false)));
if (locked != null) {
assertThat(lockedCounter.get(), equalTo(0));
} else {
assertThat(lockedCounter.get(), greaterThanOrEqualTo(1));
}
}
try (ReleasableLock locked = lock.tryAcquire(TimeValue.ZERO)) {
assertThat(locked, notNullValue());
}
}
} }

View File

@ -27,22 +27,38 @@ import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
public class HierarchyCircuitBreakerServiceTests extends ESTestCase { public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
@ -270,6 +286,337 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
assertEquals(0, requestBreaker.getTrippedCount()); assertEquals(0, requestBreaker.getTrippedCount());
} }
/**
* "Integration test" checking that we ask the G1 over limit check before parent breaking.
* Given that it depends on GC, the main assertion that we do not get a circuit breaking exception in the threads towards
* the end of the test is not enabled. The following tests checks this in more unit test style.
*/
public void testParentTriggersG1GCBeforeBreaking() throws InterruptedException, TimeoutException, BrokenBarrierException {
assumeTrue("Only G1GC can utilize the over limit check", JvmInfo.jvmInfo().useG1GC().equals("true"));
long g1RegionSize = JvmInfo.jvmInfo().getG1RegionSize();
assumeTrue("Must have region size", g1RegionSize > 0);
Settings clusterSettings = Settings.builder()
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), Boolean.TRUE)
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "50%")
.build();
AtomicInteger leaderTriggerCount = new AtomicInteger();
AtomicReference<Consumer<Boolean>> onOverLimit = new AtomicReference<>(leader -> {});
AtomicLong time = new AtomicLong(randomLongBetween(Long.MIN_VALUE/2, Long.MAX_VALUE/2));
long interval = randomLongBetween(1, 1000);
final HierarchyCircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings,
Collections.emptyList(),
new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
trackRealMemoryUsage -> new HierarchyCircuitBreakerService.G1OverLimitStrategy(JvmInfo.jvmInfo(),
HierarchyCircuitBreakerService::realMemoryUsage,
HierarchyCircuitBreakerService.createYoungGcCountSupplier(), time::get, interval, TimeValue.timeValueSeconds(30)) {
@Override
void overLimitTriggered(boolean leader) {
if (leader) {
leaderTriggerCount.incrementAndGet();
}
onOverLimit.get().accept(leader);
}
});
long maxHeap = JvmInfo.jvmInfo().getConfiguredMaxHeapSize();
int regionCount = Math.toIntExact((maxHeap / 2 + g1RegionSize - 1) / g1RegionSize);
// First setup a host of large byte[]'s, must be Humongous objects since those are cleaned during a young phase (no concurrent cycle
// necessary, which is hard to control in the test).
List<byte[]> data = new ArrayList<>();
for (int i = 0; i < regionCount; ++i) {
data.add(new byte[(int) (JvmInfo.jvmInfo().getG1RegionSize() / 2)]);
}
try {
service.checkParentLimit(0, "test");
fail("must exceed memory limit");
} catch (CircuitBreakingException e) {
// OK
}
time.addAndGet(randomLongBetween(interval, interval + 10));
onOverLimit.set(leader -> {
if (leader) {
data.clear();
}
});
logger.trace("black hole [{}]", data.hashCode());
int threadCount = randomIntBetween(1, 10);
CyclicBarrier barrier = new CyclicBarrier(threadCount + 1);
List<Thread> threads = new ArrayList<>(threadCount);
for (int i = 0; i < threadCount; ++i) {
threads.add(
new Thread(() -> {
try {
barrier.await(10, TimeUnit.SECONDS);
service.checkParentLimit(0, "test-thread");
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
throw new AssertionError(e);
} catch (CircuitBreakingException e) {
// very rare
logger.info("Thread got semi-unexpected circuit breaking exception", e);
}
}));
}
threads.forEach(Thread::start);
barrier.await(20, TimeUnit.SECONDS);
for (Thread thread : threads) {
thread.join(10000);
}
threads.forEach(thread -> assertFalse(thread.isAlive()));
assertThat(leaderTriggerCount.get(), equalTo(2));
}
public void testParentDoesOverLimitCheck() {
long g1RegionSize = JvmInfo.jvmInfo().getG1RegionSize();
Settings clusterSettings = Settings.builder()
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), Boolean.TRUE)
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "50%")
.build();
boolean saveTheDay = randomBoolean();
AtomicBoolean overLimitTriggered = new AtomicBoolean();
final HierarchyCircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings,
Collections.emptyList(),
new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
trackRealMemoryUsage ->
memoryUsed -> {
assertTrue(overLimitTriggered.compareAndSet(false, true));
if (saveTheDay) {
return new HierarchyCircuitBreakerService.MemoryUsage(memoryUsed.baseUsage / 2,
memoryUsed.totalUsage - (memoryUsed.baseUsage / 2), memoryUsed.transientChildUsage,
memoryUsed.permanentChildUsage);
} else {
return memoryUsed;
}
}
);
int allocationSize = g1RegionSize > 0 ? (int) (g1RegionSize / 2) : 1024 * 1024;
int allocationCount = (int) (JvmInfo.jvmInfo().getConfiguredMaxHeapSize() / allocationSize) + 1;
List<byte[]> data = new ArrayList<>();
try {
for (int i = 0 ; i < allocationCount && overLimitTriggered.get() == false; ++i) {
data.add(new byte[allocationSize]);
service.checkParentLimit(0, "test");
}
assertTrue(saveTheDay);
} catch (CircuitBreakingException e) {
assertFalse(saveTheDay);
}
logger.trace("black hole [{}]", data.hashCode());
}
public void testFallbackG1RegionSize() {
assumeTrue("Only G1GC can utilize the over limit check", JvmInfo.jvmInfo().useG1GC().equals("true"));
assumeTrue("Must have region size", JvmInfo.jvmInfo().getG1RegionSize() > 0);
assertThat(HierarchyCircuitBreakerService.G1OverLimitStrategy.fallbackRegionSize(JvmInfo.jvmInfo()),
equalTo(JvmInfo.jvmInfo().getG1RegionSize()));
}
public void testG1OverLimitStrategyBreakOnMemory() {
AtomicLong time = new AtomicLong(randomLongBetween(Long.MIN_VALUE/2, Long.MAX_VALUE/2));
AtomicInteger leaderTriggerCount = new AtomicInteger();
AtomicInteger nonLeaderTriggerCount = new AtomicInteger();
long interval = randomLongBetween(1, 1000);
AtomicLong memoryUsage = new AtomicLong();
HierarchyCircuitBreakerService.G1OverLimitStrategy strategy =
new HierarchyCircuitBreakerService.G1OverLimitStrategy(JvmInfo.jvmInfo(), memoryUsage::get, () -> 0, time::get, interval,
TimeValue.timeValueSeconds(30)) {
@Override
void overLimitTriggered(boolean leader) {
if (leader) {
leaderTriggerCount.incrementAndGet();
} else {
nonLeaderTriggerCount.incrementAndGet();
}
}
};
memoryUsage.set(randomLongBetween(100, 110));
HierarchyCircuitBreakerService.MemoryUsage input = new HierarchyCircuitBreakerService.MemoryUsage(100, randomLongBetween(100, 110),
randomLongBetween(0, 50),
randomLongBetween(0, 50));
assertThat(strategy.overLimit(input), sameInstance(input));
assertThat(leaderTriggerCount.get(), equalTo(1));
memoryUsage.set(99);
HierarchyCircuitBreakerService.MemoryUsage output = strategy.overLimit(input);
assertThat(output, not(sameInstance(input)));
assertThat(output.baseUsage, equalTo(memoryUsage.get()));
assertThat(output.totalUsage, equalTo(99 + input.totalUsage - 100));
assertThat(output.transientChildUsage, equalTo(input.transientChildUsage));
assertThat(output.permanentChildUsage, equalTo(input.permanentChildUsage));
assertThat(nonLeaderTriggerCount.get(), equalTo(1));
time.addAndGet(randomLongBetween(interval, interval * 2));
output = strategy.overLimit(input);
assertThat(output, not(sameInstance(input)));
assertThat(output.baseUsage, equalTo(memoryUsage.get()));
assertThat(output.totalUsage, equalTo(99 + input.totalUsage - 100));
assertThat(output.transientChildUsage, equalTo(input.transientChildUsage));
assertThat(output.permanentChildUsage, equalTo(input.permanentChildUsage));
assertThat(leaderTriggerCount.get(), equalTo(2));
}
public void testG1OverLimitStrategyBreakOnGcCount() {
AtomicLong time = new AtomicLong(randomLongBetween(Long.MIN_VALUE/2, Long.MAX_VALUE/2));
AtomicInteger leaderTriggerCount = new AtomicInteger();
AtomicInteger nonLeaderTriggerCount = new AtomicInteger();
long interval = randomLongBetween(1, 1000);
AtomicLong memoryUsageCounter = new AtomicLong();
AtomicLong gcCounter = new AtomicLong();
LongSupplier memoryUsageSupplier = () -> {
memoryUsageCounter.incrementAndGet();
return randomLongBetween(100, 110);
};
HierarchyCircuitBreakerService.G1OverLimitStrategy strategy =
new HierarchyCircuitBreakerService.G1OverLimitStrategy(JvmInfo.jvmInfo(),
memoryUsageSupplier,
gcCounter::incrementAndGet,
time::get, interval, TimeValue.timeValueSeconds(30)) {
@Override
void overLimitTriggered(boolean leader) {
if (leader) {
leaderTriggerCount.incrementAndGet();
} else {
nonLeaderTriggerCount.incrementAndGet();
}
}
};
HierarchyCircuitBreakerService.MemoryUsage input = new HierarchyCircuitBreakerService.MemoryUsage(100, randomLongBetween(100, 110),
randomLongBetween(0, 50),
randomLongBetween(0, 50));
assertThat(strategy.overLimit(input), sameInstance(input));
assertThat(leaderTriggerCount.get(), equalTo(1));
assertThat(gcCounter.get(), equalTo(2L));
assertThat(memoryUsageCounter.get(), equalTo(2L)); // 1 before gc count break and 1 to get resulting memory usage.
}
public void testG1OverLimitStrategyThrottling() throws InterruptedException, BrokenBarrierException, TimeoutException {
AtomicLong time = new AtomicLong(randomLongBetween(Long.MIN_VALUE/2, Long.MAX_VALUE/2));
AtomicInteger leaderTriggerCount = new AtomicInteger();
long interval = randomLongBetween(1, 1000);
AtomicLong memoryUsage = new AtomicLong();
HierarchyCircuitBreakerService.G1OverLimitStrategy strategy =
new HierarchyCircuitBreakerService.G1OverLimitStrategy(JvmInfo.jvmInfo(), memoryUsage::get, () -> 0,
time::get, interval, TimeValue.timeValueSeconds(30)) {
@Override
void overLimitTriggered(boolean leader) {
if (leader) {
leaderTriggerCount.incrementAndGet();
}
}
};
int threadCount = randomIntBetween(1, 10);
CyclicBarrier barrier = new CyclicBarrier(threadCount + 1);
AtomicReference<CountDownLatch> countDown = new AtomicReference<>(new CountDownLatch(randomIntBetween(1, 20)));
List<Thread> threads = IntStream.range(0, threadCount)
.mapToObj(i -> new Thread(() -> {
try {
barrier.await(10, TimeUnit.SECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
throw new AssertionError(e);
}
do {
HierarchyCircuitBreakerService.MemoryUsage input =
new HierarchyCircuitBreakerService.MemoryUsage(randomLongBetween(0, 100), randomLongBetween(0, 100),
randomLongBetween(0, 100), randomLongBetween(0, 100));
HierarchyCircuitBreakerService.MemoryUsage output = strategy.overLimit(input);
assertThat(output.totalUsage, equalTo(output.baseUsage + input.totalUsage - input.baseUsage));
assertThat(output.transientChildUsage, equalTo(input.transientChildUsage));
assertThat(output.permanentChildUsage, equalTo(input.permanentChildUsage));
countDown.get().countDown();
} while (!Thread.interrupted());
})).collect(Collectors.toList());
threads.forEach(Thread::start);
barrier.await(20, TimeUnit.SECONDS);
int iterationCount = randomIntBetween(1, 5);
for (int i = 0; i < iterationCount; ++i) {
memoryUsage.set(randomLongBetween(0, 100));
assertTrue(countDown.get().await(20, TimeUnit.SECONDS));
assertThat(leaderTriggerCount.get(), lessThanOrEqualTo(i + 1));
assertThat(leaderTriggerCount.get(), greaterThanOrEqualTo(i / 2 + 1));
time.addAndGet(randomLongBetween(interval, interval * 2));
countDown.set(new CountDownLatch(randomIntBetween(1, 20)));
}
threads.forEach(Thread::interrupt);
for (Thread thread : threads) {
thread.join(10000);
}
threads.forEach(thread -> assertFalse(thread.isAlive()));
}
public void testCreateOverLimitStrategy() {
assertThat(HierarchyCircuitBreakerService.createOverLimitStrategy(false),
not(instanceOf(HierarchyCircuitBreakerService.G1OverLimitStrategy.class)));
HierarchyCircuitBreakerService.OverLimitStrategy overLimitStrategy = HierarchyCircuitBreakerService.createOverLimitStrategy(true);
if (JvmInfo.jvmInfo().useG1GC().equals("true")) {
assertThat(overLimitStrategy, instanceOf(HierarchyCircuitBreakerService.G1OverLimitStrategy.class));
assertThat(((HierarchyCircuitBreakerService.G1OverLimitStrategy) overLimitStrategy).getLockTimeout(),
equalTo(TimeValue.timeValueMillis(500)));
} else {
assertThat(overLimitStrategy, not(instanceOf(HierarchyCircuitBreakerService.G1OverLimitStrategy.class)));
}
}
public void testG1LockTimeout() throws Exception {
CountDownLatch startedBlocking = new CountDownLatch(1);
CountDownLatch blockingUntil = new CountDownLatch(1);
AtomicLong gcCounter = new AtomicLong();
HierarchyCircuitBreakerService.G1OverLimitStrategy strategy =
new HierarchyCircuitBreakerService.G1OverLimitStrategy(JvmInfo.jvmInfo(), () -> 100, gcCounter::incrementAndGet,
() -> 0, 1, TimeValue.timeValueMillis(randomFrom(0, 5, 10))) {
@Override
void overLimitTriggered(boolean leader) {
if (leader) {
startedBlocking.countDown();
try {
// this is the central assertion - the overLimit call below should complete in a timely manner.
assertThat(blockingUntil.await(10, TimeUnit.SECONDS), is(true));
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
}
};
HierarchyCircuitBreakerService.MemoryUsage input = new HierarchyCircuitBreakerService.MemoryUsage(100, 100, 0, 0);
Thread blocker = new Thread(() -> {
strategy.overLimit(input);
});
blocker.start();
try {
assertThat(startedBlocking.await(10, TimeUnit.SECONDS), is(true));
// this should complete in a timely manner, verified by the assertion in the thread.
assertThat(strategy.overLimit(input), sameInstance(input));
} finally {
blockingUntil.countDown();
blocker.join(10000);
assertThat(blocker.isAlive(), is(false));
}
}
public void testTrippedCircuitBreakerDurability() { public void testTrippedCircuitBreakerDurability() {
Settings clusterSettings = Settings.builder() Settings clusterSettings = Settings.builder()
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), Boolean.FALSE) .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), Boolean.FALSE)