Circuit-break based on real memory usage
With this commit we introduce a new circuit-breaking strategy to the parent circuit breaker. Contrary to the current implementation which only accounts for memory reserved via child circuit breakers, the new strategy measures real heap memory usage at the time of reservation. This allows us to be much more aggressive with the circuit breaker limit so we bump it to 95% by default. The new strategy is turned on by default and can be controlled with the new cluster setting `indices.breaker.total.userealmemory`. Note that we turn it off for all integration tests with an internal test cluster because it leads to spurious test failures which are of no value (we cannot fully control heap memory usage in tests). All REST tests, however, will make use of the real memory circuit breaker. Relates #31767
This commit is contained in:
parent
d2461643cd
commit
f174f72fee
|
@ -0,0 +1,105 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.benchmark.indices.breaker;
|
||||
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||
import org.openjdk.jmh.annotations.Fork;
|
||||
import org.openjdk.jmh.annotations.Measurement;
|
||||
import org.openjdk.jmh.annotations.Mode;
|
||||
import org.openjdk.jmh.annotations.OutputTimeUnit;
|
||||
import org.openjdk.jmh.annotations.Param;
|
||||
import org.openjdk.jmh.annotations.Scope;
|
||||
import org.openjdk.jmh.annotations.State;
|
||||
import org.openjdk.jmh.annotations.Threads;
|
||||
import org.openjdk.jmh.annotations.Warmup;
|
||||
import org.openjdk.jmh.infra.Blackhole;
|
||||
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.MemoryMXBean;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Fork(3)
|
||||
@Warmup(iterations = 10)
|
||||
@Measurement(iterations = 10)
|
||||
@BenchmarkMode(Mode.AverageTime)
|
||||
@OutputTimeUnit(TimeUnit.MICROSECONDS)
|
||||
@State(Scope.Benchmark)
|
||||
@SuppressWarnings("unused") //invoked by benchmarking framework
|
||||
public class MemoryStatsBenchmark {
|
||||
private static final MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean();
|
||||
|
||||
@Param({"0", "16", "256", "4096"})
|
||||
private int tokens;
|
||||
|
||||
@Benchmark
|
||||
public void baseline() {
|
||||
Blackhole.consumeCPU(tokens);
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(1)
|
||||
public long getMemoryStats_01() {
|
||||
Blackhole.consumeCPU(tokens);
|
||||
return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(2)
|
||||
public long getMemoryStats_02() {
|
||||
Blackhole.consumeCPU(tokens);
|
||||
return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(4)
|
||||
public long getMemoryStats_04() {
|
||||
Blackhole.consumeCPU(tokens);
|
||||
return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(8)
|
||||
public long getMemoryStats_08() {
|
||||
Blackhole.consumeCPU(tokens);
|
||||
return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(16)
|
||||
public long getMemoryStats_16() {
|
||||
Blackhole.consumeCPU(tokens);
|
||||
return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(32)
|
||||
public long getMemoryStats_32() {
|
||||
Blackhole.consumeCPU(tokens);
|
||||
return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(64)
|
||||
public long getMemoryStats_64() {
|
||||
Blackhole.consumeCPU(tokens);
|
||||
return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
|
||||
}
|
||||
}
|
||||
|
|
@ -69,4 +69,12 @@ The following previously deprecated url parameter have been removed:
|
|||
|
||||
Previously the in flight requests circuit breaker considered only the raw byte representation.
|
||||
By bumping the value of `network.breaker.inflight_requests.overhead` from 1 to 2, this circuit
|
||||
breaker considers now also the memory overhead of representing the request as a structured object.
|
||||
breaker considers now also the memory overhead of representing the request as a structured object.
|
||||
|
||||
==== Parent circuit breaker changes
|
||||
|
||||
The parent circuit breaker defines a new setting `indices.breaker.total.use_real_memory` which is
|
||||
`true` by default. This means that the parent circuit breaker will trip based on currently used
|
||||
heap memory instead of only considering the reserved memory by child circuit breakers. When this
|
||||
setting is `true`, the default parent breaker limit also changes from 70% to 95% of the JVM heap size.
|
||||
The previous behavior can be restored by setting `indices.breaker.total.use_real_memory` to `false`.
|
||||
|
|
|
@ -13,11 +13,18 @@ These settings can be dynamically updated on a live cluster with the
|
|||
[float]
|
||||
==== Parent circuit breaker
|
||||
|
||||
The parent-level breaker can be configured with the following setting:
|
||||
The parent-level breaker can be configured with the following settings:
|
||||
|
||||
`indices.breaker.total.use_real_memory`::
|
||||
|
||||
Whether the parent breaker should take real memory usage into account (`true`) or only
|
||||
consider the amount that is reserved by child circuit breakers (`false`). Defaults to `true`.
|
||||
|
||||
`indices.breaker.total.limit`::
|
||||
|
||||
Starting limit for overall parent breaker, defaults to 70% of JVM heap.
|
||||
Starting limit for overall parent breaker, defaults to 70% of JVM heap if
|
||||
`indices.breaker.total.use_real_memory` is `false`. If `indices.breaker.total.use_real_memory`
|
||||
is `true`, defaults to 95% of the JVM heap.
|
||||
|
||||
[[fielddata-circuit-breaker]]
|
||||
[float]
|
||||
|
|
|
@ -125,7 +125,7 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker {
|
|||
|
||||
// Additionally, we need to check that we haven't exceeded the parent's limit
|
||||
try {
|
||||
parent.checkParentLimit(label);
|
||||
parent.checkParentLimit((long) (bytes * overheadConstant), label);
|
||||
} catch (CircuitBreakingException e) {
|
||||
// If the parent breaker is tripped, this breaker has to be
|
||||
// adjusted back down because the allocation is "blocked" but the
|
||||
|
|
|
@ -254,6 +254,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
HttpTransportSettings.SETTING_HTTP_TCP_REUSE_ADDRESS,
|
||||
HttpTransportSettings.SETTING_HTTP_TCP_SEND_BUFFER_SIZE,
|
||||
HttpTransportSettings.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE,
|
||||
HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING,
|
||||
HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING,
|
||||
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING,
|
||||
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING,
|
||||
|
|
|
@ -30,6 +30,8 @@ import org.elasticsearch.common.settings.Setting.Property;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.MemoryMXBean;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -44,10 +46,21 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
|
|||
|
||||
private static final String CHILD_LOGGER_PREFIX = "org.elasticsearch.indices.breaker.";
|
||||
|
||||
private static final MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean();
|
||||
|
||||
private final ConcurrentMap<String, CircuitBreaker> breakers = new ConcurrentHashMap<>();
|
||||
|
||||
public static final Setting<Boolean> USE_REAL_MEMORY_USAGE_SETTING =
|
||||
Setting.boolSetting("indices.breaker.total.use_real_memory", true, Property.NodeScope);
|
||||
|
||||
public static final Setting<ByteSizeValue> TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING =
|
||||
Setting.memorySizeSetting("indices.breaker.total.limit", "70%", Property.Dynamic, Property.NodeScope);
|
||||
Setting.memorySizeSetting("indices.breaker.total.limit", settings -> {
|
||||
if (USE_REAL_MEMORY_USAGE_SETTING.get(settings)) {
|
||||
return "95%";
|
||||
} else {
|
||||
return "70%";
|
||||
}
|
||||
}, Property.Dynamic, Property.NodeScope);
|
||||
|
||||
public static final Setting<ByteSizeValue> FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING =
|
||||
Setting.memorySizeSetting("indices.breaker.fielddata.limit", "60%", Property.Dynamic, Property.NodeScope);
|
||||
|
@ -77,6 +90,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
|
|||
public static final Setting<CircuitBreaker.Type> IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_TYPE_SETTING =
|
||||
new Setting<>("network.breaker.inflight_requests.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope);
|
||||
|
||||
private final boolean trackRealMemoryUsage;
|
||||
private volatile BreakerSettings parentSettings;
|
||||
private volatile BreakerSettings fielddataSettings;
|
||||
private volatile BreakerSettings inFlightRequestsSettings;
|
||||
|
@ -120,6 +134,8 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
|
|||
logger.trace("parent circuit breaker with settings {}", this.parentSettings);
|
||||
}
|
||||
|
||||
this.trackRealMemoryUsage = USE_REAL_MEMORY_USAGE_SETTING.get(settings);
|
||||
|
||||
registerBreaker(this.requestSettings);
|
||||
registerBreaker(this.fielddataSettings);
|
||||
registerBreaker(this.inFlightRequestsSettings);
|
||||
|
@ -191,17 +207,15 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
|
|||
|
||||
@Override
|
||||
public AllCircuitBreakerStats stats() {
|
||||
long parentEstimated = 0;
|
||||
List<CircuitBreakerStats> allStats = new ArrayList<>(this.breakers.size());
|
||||
// Gather the "estimated" count for the parent breaker by adding the
|
||||
// estimations for each individual breaker
|
||||
for (CircuitBreaker breaker : this.breakers.values()) {
|
||||
allStats.add(stats(breaker.getName()));
|
||||
parentEstimated += breaker.getUsed();
|
||||
}
|
||||
// Manually add the parent breaker settings since they aren't part of the breaker map
|
||||
allStats.add(new CircuitBreakerStats(CircuitBreaker.PARENT, parentSettings.getLimit(),
|
||||
parentEstimated, 1.0, parentTripCount.get()));
|
||||
parentUsed(0L), 1.0, parentTripCount.get()));
|
||||
return new AllCircuitBreakerStats(allStats.toArray(new CircuitBreakerStats[allStats.size()]));
|
||||
}
|
||||
|
||||
|
@ -211,15 +225,28 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
|
|||
return new CircuitBreakerStats(breaker.getName(), breaker.getLimit(), breaker.getUsed(), breaker.getOverhead(), breaker.getTrippedCount());
|
||||
}
|
||||
|
||||
private long parentUsed(long newBytesReserved) {
|
||||
if (this.trackRealMemoryUsage) {
|
||||
return currentMemoryUsage() + newBytesReserved;
|
||||
} else {
|
||||
long parentEstimated = 0;
|
||||
for (CircuitBreaker breaker : this.breakers.values()) {
|
||||
parentEstimated += breaker.getUsed() * breaker.getOverhead();
|
||||
}
|
||||
return parentEstimated;
|
||||
}
|
||||
}
|
||||
|
||||
//package private to allow overriding it in tests
|
||||
long currentMemoryUsage() {
|
||||
return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether the parent breaker has been tripped
|
||||
*/
|
||||
public void checkParentLimit(String label) throws CircuitBreakingException {
|
||||
long totalUsed = 0;
|
||||
for (CircuitBreaker breaker : this.breakers.values()) {
|
||||
totalUsed += (breaker.getUsed() * breaker.getOverhead());
|
||||
}
|
||||
|
||||
public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException {
|
||||
long totalUsed = parentUsed(newBytesReserved);
|
||||
long parentLimit = this.parentSettings.getLimit();
|
||||
if (totalUsed > parentLimit) {
|
||||
this.parentTripCount.incrementAndGet();
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.common.settings;
|
||||
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.indices.IndexingMemoryController;
|
||||
|
@ -57,8 +58,15 @@ public class MemorySizeSettingsTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testCircuitBreakerSettings() {
|
||||
// default is chosen based on actual heap size
|
||||
double defaultTotalPercentage;
|
||||
if (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() < new ByteSizeValue(1, ByteSizeUnit.GB).getBytes()) {
|
||||
defaultTotalPercentage = 0.95d;
|
||||
} else {
|
||||
defaultTotalPercentage = 0.7d;
|
||||
}
|
||||
assertMemorySizeSetting(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, "indices.breaker.total.limit",
|
||||
new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.7)));
|
||||
new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * defaultTotalPercentage)));
|
||||
assertMemorySizeSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "indices.breaker.fielddata.limit",
|
||||
new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.6)));
|
||||
assertMemorySizeSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, "indices.breaker.request.limit",
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.test.ESTestCase;
|
|||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
@ -56,7 +57,7 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void checkParentLimit(String label) throws CircuitBreakingException {
|
||||
public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException {
|
||||
// never trip
|
||||
}
|
||||
};
|
||||
|
@ -114,7 +115,7 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void checkParentLimit(String label) throws CircuitBreakingException {
|
||||
public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException {
|
||||
// Parent will trip right before regular breaker would trip
|
||||
if (getBreaker(CircuitBreaker.REQUEST).getUsed() > parentLimit) {
|
||||
parentTripped.incrementAndGet();
|
||||
|
@ -170,6 +171,7 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
|
|||
*/
|
||||
public void testBorrowingSiblingBreakerMemory() throws Exception {
|
||||
Settings clusterSettings = Settings.builder()
|
||||
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false)
|
||||
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "200mb")
|
||||
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "150mb")
|
||||
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "150mb")
|
||||
|
@ -199,4 +201,50 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
|
|||
assertThat(exception.getMessage(), containsString("which is larger than the limit of [209715200/200mb]"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testParentBreaksOnRealMemoryUsage() throws Exception {
|
||||
Settings clusterSettings = Settings.builder()
|
||||
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), Boolean.TRUE)
|
||||
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "200b")
|
||||
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "300b")
|
||||
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.getKey(), 2)
|
||||
.build();
|
||||
|
||||
AtomicLong memoryUsage = new AtomicLong();
|
||||
final CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings,
|
||||
new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) {
|
||||
@Override
|
||||
long currentMemoryUsage() {
|
||||
return memoryUsage.get();
|
||||
}
|
||||
};
|
||||
final CircuitBreaker requestBreaker = service.getBreaker(CircuitBreaker.REQUEST);
|
||||
|
||||
// anything below 100 bytes should work (overhead) - current memory usage is zero
|
||||
requestBreaker.addEstimateBytesAndMaybeBreak(randomLongBetween(0, 99), "request");
|
||||
assertEquals(0, requestBreaker.getTrippedCount());
|
||||
// assume memory usage has increased to 150 bytes
|
||||
memoryUsage.set(150);
|
||||
|
||||
// a reservation that bumps memory usage to less than 200 (150 bytes used + reservation < 200)
|
||||
requestBreaker.addEstimateBytesAndMaybeBreak(randomLongBetween(0, 24), "request");
|
||||
assertEquals(0, requestBreaker.getTrippedCount());
|
||||
memoryUsage.set(181);
|
||||
|
||||
long reservationInBytes = randomLongBetween(10, 50);
|
||||
// anything >= 20 bytes (10 bytes * 2 overhead) reservation breaks the parent but it must be low enough to avoid
|
||||
// breaking the child breaker.
|
||||
CircuitBreakingException exception = expectThrows(CircuitBreakingException.class, () -> requestBreaker
|
||||
.addEstimateBytesAndMaybeBreak(reservationInBytes, "request"));
|
||||
// it was the parent that rejected the reservation
|
||||
assertThat(exception.getMessage(), containsString("[parent] Data too large, data for [request] would be"));
|
||||
assertThat(exception.getMessage(), containsString("which is larger than the limit of [200/200b]"));
|
||||
assertEquals(0, requestBreaker.getTrippedCount());
|
||||
assertEquals(1, service.stats().getStats(CircuitBreaker.PARENT).getTrippedCount());
|
||||
|
||||
// lower memory usage again - the same reservation should succeed
|
||||
memoryUsage.set(100);
|
||||
requestBreaker.addEstimateBytesAndMaybeBreak(reservationInBytes, "request");
|
||||
assertEquals(0, requestBreaker.getTrippedCount());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -394,6 +394,10 @@ public final class InternalTestCluster extends TestCluster {
|
|||
builder.put(MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING.getKey(), new TimeValue(RandomNumbers.randomIntBetween(random, 10, 30), TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
// turning on the real memory circuit breaker leads to spurious test failures. As have no full control over heap usage, we
|
||||
// turn it off for these tests.
|
||||
builder.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false);
|
||||
|
||||
if (random.nextInt(10) == 0) {
|
||||
builder.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getKey(), "noop");
|
||||
builder.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING.getKey(), "noop");
|
||||
|
|
Loading…
Reference in New Issue