diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/indices/breaker/MemoryStatsBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/indices/breaker/MemoryStatsBenchmark.java new file mode 100644 index 00000000000..9537cfb0bb3 --- /dev/null +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/indices/breaker/MemoryStatsBenchmark.java @@ -0,0 +1,105 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.benchmark.indices.breaker; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.util.concurrent.TimeUnit; + +@Fork(3) +@Warmup(iterations = 10) +@Measurement(iterations = 10) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Benchmark) +@SuppressWarnings("unused") //invoked by benchmarking framework +public class MemoryStatsBenchmark { + private static final MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean(); + + @Param({"0", "16", "256", "4096"}) + private int tokens; + + @Benchmark + public void baseline() { + Blackhole.consumeCPU(tokens); + } + + @Benchmark + @Threads(1) + public long getMemoryStats_01() { + Blackhole.consumeCPU(tokens); + return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed(); + } + + @Benchmark + @Threads(2) + public long getMemoryStats_02() { + Blackhole.consumeCPU(tokens); + return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed(); + } + + @Benchmark + @Threads(4) + public long getMemoryStats_04() { + Blackhole.consumeCPU(tokens); + return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed(); + } + + @Benchmark + @Threads(8) + public long getMemoryStats_08() { + Blackhole.consumeCPU(tokens); + return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed(); + } + + @Benchmark + @Threads(16) + public long getMemoryStats_16() { + Blackhole.consumeCPU(tokens); + return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed(); + } + + @Benchmark + @Threads(32) + public long getMemoryStats_32() { + Blackhole.consumeCPU(tokens); + return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed(); + } + + @Benchmark + @Threads(64) + public long getMemoryStats_64() { + Blackhole.consumeCPU(tokens); + return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed(); + } +} + diff --git a/docs/reference/migration/migrate_7_0/indices.asciidoc b/docs/reference/migration/migrate_7_0/indices.asciidoc index b03a6014d5b..bab7b602220 100644 --- a/docs/reference/migration/migrate_7_0/indices.asciidoc +++ b/docs/reference/migration/migrate_7_0/indices.asciidoc @@ -69,4 +69,12 @@ The following previously deprecated url parameter have been removed: Previously the in flight requests circuit breaker considered only the raw byte representation. By bumping the value of `network.breaker.inflight_requests.overhead` from 1 to 2, this circuit -breaker considers now also the memory overhead of representing the request as a structured object. \ No newline at end of file +breaker considers now also the memory overhead of representing the request as a structured object. + +==== Parent circuit breaker changes + +The parent circuit breaker defines a new setting `indices.breaker.total.use_real_memory` which is +`true` by default. This means that the parent circuit breaker will trip based on currently used +heap memory instead of only considering the reserved memory by child circuit breakers. When this +setting is `true`, the default parent breaker limit also changes from 70% to 95% of the JVM heap size. +The previous behavior can be restored by setting `indices.breaker.total.use_real_memory` to `false`. diff --git a/docs/reference/modules/indices/circuit_breaker.asciidoc b/docs/reference/modules/indices/circuit_breaker.asciidoc index 559137a8210..61a89530c42 100644 --- a/docs/reference/modules/indices/circuit_breaker.asciidoc +++ b/docs/reference/modules/indices/circuit_breaker.asciidoc @@ -13,11 +13,18 @@ These settings can be dynamically updated on a live cluster with the [float] ==== Parent circuit breaker -The parent-level breaker can be configured with the following setting: +The parent-level breaker can be configured with the following settings: + +`indices.breaker.total.use_real_memory`:: + + Whether the parent breaker should take real memory usage into account (`true`) or only + consider the amount that is reserved by child circuit breakers (`false`). Defaults to `true`. `indices.breaker.total.limit`:: - Starting limit for overall parent breaker, defaults to 70% of JVM heap. + Starting limit for overall parent breaker, defaults to 70% of JVM heap if + `indices.breaker.total.use_real_memory` is `false`. If `indices.breaker.total.use_real_memory` + is `true`, defaults to 95% of the JVM heap. [[fielddata-circuit-breaker]] [float] diff --git a/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java b/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java index e4019f9f665..a682565adf7 100644 --- a/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java +++ b/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java @@ -125,7 +125,7 @@ public class ChildMemoryCircuitBreaker implements CircuitBreaker { // Additionally, we need to check that we haven't exceeded the parent's limit try { - parent.checkParentLimit(label); + parent.checkParentLimit((long) (bytes * overheadConstant), label); } catch (CircuitBreakingException e) { // If the parent breaker is tripped, this breaker has to be // adjusted back down because the allocation is "blocked" but the diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 478325c66f9..03a0f3b42a1 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -254,6 +254,7 @@ public final class ClusterSettings extends AbstractScopedSettings { HttpTransportSettings.SETTING_HTTP_TCP_REUSE_ADDRESS, HttpTransportSettings.SETTING_HTTP_TCP_SEND_BUFFER_SIZE, HttpTransportSettings.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE, + HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING, HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, diff --git a/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java b/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java index e3a914c730e..ebebcfd253c 100644 --- a/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java +++ b/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java @@ -30,6 +30,8 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -44,10 +46,21 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { private static final String CHILD_LOGGER_PREFIX = "org.elasticsearch.indices.breaker."; + private static final MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean(); + private final ConcurrentMap breakers = new ConcurrentHashMap<>(); + public static final Setting USE_REAL_MEMORY_USAGE_SETTING = + Setting.boolSetting("indices.breaker.total.use_real_memory", true, Property.NodeScope); + public static final Setting TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING = - Setting.memorySizeSetting("indices.breaker.total.limit", "70%", Property.Dynamic, Property.NodeScope); + Setting.memorySizeSetting("indices.breaker.total.limit", settings -> { + if (USE_REAL_MEMORY_USAGE_SETTING.get(settings)) { + return "95%"; + } else { + return "70%"; + } + }, Property.Dynamic, Property.NodeScope); public static final Setting FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.memorySizeSetting("indices.breaker.fielddata.limit", "60%", Property.Dynamic, Property.NodeScope); @@ -77,6 +90,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { public static final Setting IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_TYPE_SETTING = new Setting<>("network.breaker.inflight_requests.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope); + private final boolean trackRealMemoryUsage; private volatile BreakerSettings parentSettings; private volatile BreakerSettings fielddataSettings; private volatile BreakerSettings inFlightRequestsSettings; @@ -120,6 +134,8 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { logger.trace("parent circuit breaker with settings {}", this.parentSettings); } + this.trackRealMemoryUsage = USE_REAL_MEMORY_USAGE_SETTING.get(settings); + registerBreaker(this.requestSettings); registerBreaker(this.fielddataSettings); registerBreaker(this.inFlightRequestsSettings); @@ -191,17 +207,15 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { @Override public AllCircuitBreakerStats stats() { - long parentEstimated = 0; List allStats = new ArrayList<>(this.breakers.size()); // Gather the "estimated" count for the parent breaker by adding the // estimations for each individual breaker for (CircuitBreaker breaker : this.breakers.values()) { allStats.add(stats(breaker.getName())); - parentEstimated += breaker.getUsed(); } // Manually add the parent breaker settings since they aren't part of the breaker map allStats.add(new CircuitBreakerStats(CircuitBreaker.PARENT, parentSettings.getLimit(), - parentEstimated, 1.0, parentTripCount.get())); + parentUsed(0L), 1.0, parentTripCount.get())); return new AllCircuitBreakerStats(allStats.toArray(new CircuitBreakerStats[allStats.size()])); } @@ -211,15 +225,28 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { return new CircuitBreakerStats(breaker.getName(), breaker.getLimit(), breaker.getUsed(), breaker.getOverhead(), breaker.getTrippedCount()); } + private long parentUsed(long newBytesReserved) { + if (this.trackRealMemoryUsage) { + return currentMemoryUsage() + newBytesReserved; + } else { + long parentEstimated = 0; + for (CircuitBreaker breaker : this.breakers.values()) { + parentEstimated += breaker.getUsed() * breaker.getOverhead(); + } + return parentEstimated; + } + } + + //package private to allow overriding it in tests + long currentMemoryUsage() { + return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed(); + } + /** * Checks whether the parent breaker has been tripped */ - public void checkParentLimit(String label) throws CircuitBreakingException { - long totalUsed = 0; - for (CircuitBreaker breaker : this.breakers.values()) { - totalUsed += (breaker.getUsed() * breaker.getOverhead()); - } - + public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException { + long totalUsed = parentUsed(newBytesReserved); long parentLimit = this.parentSettings.getLimit(); if (totalUsed > parentLimit) { this.parentTripCount.incrementAndGet(); diff --git a/server/src/test/java/org/elasticsearch/common/settings/MemorySizeSettingsTests.java b/server/src/test/java/org/elasticsearch/common/settings/MemorySizeSettingsTests.java index 24ce166d158..23392178bf8 100644 --- a/server/src/test/java/org/elasticsearch/common/settings/MemorySizeSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/common/settings/MemorySizeSettingsTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.common.settings; import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.IndexingMemoryController; @@ -57,8 +58,15 @@ public class MemorySizeSettingsTests extends ESTestCase { } public void testCircuitBreakerSettings() { + // default is chosen based on actual heap size + double defaultTotalPercentage; + if (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() < new ByteSizeValue(1, ByteSizeUnit.GB).getBytes()) { + defaultTotalPercentage = 0.95d; + } else { + defaultTotalPercentage = 0.7d; + } assertMemorySizeSetting(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, "indices.breaker.total.limit", - new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.7))); + new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * defaultTotalPercentage))); assertMemorySizeSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "indices.breaker.fielddata.limit", new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.6))); assertMemorySizeSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, "indices.breaker.request.limit", diff --git a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java index a03739b2d9a..00bd15d244f 100644 --- a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.test.ESTestCase; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.containsString; @@ -56,7 +57,7 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { } @Override - public void checkParentLimit(String label) throws CircuitBreakingException { + public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException { // never trip } }; @@ -114,7 +115,7 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { } @Override - public void checkParentLimit(String label) throws CircuitBreakingException { + public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException { // Parent will trip right before regular breaker would trip if (getBreaker(CircuitBreaker.REQUEST).getUsed() > parentLimit) { parentTripped.incrementAndGet(); @@ -170,6 +171,7 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { */ public void testBorrowingSiblingBreakerMemory() throws Exception { Settings clusterSettings = Settings.builder() + .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false) .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "200mb") .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "150mb") .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "150mb") @@ -199,4 +201,50 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { assertThat(exception.getMessage(), containsString("which is larger than the limit of [209715200/200mb]")); } } + + public void testParentBreaksOnRealMemoryUsage() throws Exception { + Settings clusterSettings = Settings.builder() + .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), Boolean.TRUE) + .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "200b") + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "300b") + .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.getKey(), 2) + .build(); + + AtomicLong memoryUsage = new AtomicLong(); + final CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings, + new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) { + @Override + long currentMemoryUsage() { + return memoryUsage.get(); + } + }; + final CircuitBreaker requestBreaker = service.getBreaker(CircuitBreaker.REQUEST); + + // anything below 100 bytes should work (overhead) - current memory usage is zero + requestBreaker.addEstimateBytesAndMaybeBreak(randomLongBetween(0, 99), "request"); + assertEquals(0, requestBreaker.getTrippedCount()); + // assume memory usage has increased to 150 bytes + memoryUsage.set(150); + + // a reservation that bumps memory usage to less than 200 (150 bytes used + reservation < 200) + requestBreaker.addEstimateBytesAndMaybeBreak(randomLongBetween(0, 24), "request"); + assertEquals(0, requestBreaker.getTrippedCount()); + memoryUsage.set(181); + + long reservationInBytes = randomLongBetween(10, 50); + // anything >= 20 bytes (10 bytes * 2 overhead) reservation breaks the parent but it must be low enough to avoid + // breaking the child breaker. + CircuitBreakingException exception = expectThrows(CircuitBreakingException.class, () -> requestBreaker + .addEstimateBytesAndMaybeBreak(reservationInBytes, "request")); + // it was the parent that rejected the reservation + assertThat(exception.getMessage(), containsString("[parent] Data too large, data for [request] would be")); + assertThat(exception.getMessage(), containsString("which is larger than the limit of [200/200b]")); + assertEquals(0, requestBreaker.getTrippedCount()); + assertEquals(1, service.stats().getStats(CircuitBreaker.PARENT).getTrippedCount()); + + // lower memory usage again - the same reservation should succeed + memoryUsage.set(100); + requestBreaker.addEstimateBytesAndMaybeBreak(reservationInBytes, "request"); + assertEquals(0, requestBreaker.getTrippedCount()); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 79965d79f52..a0468091c84 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -394,6 +394,10 @@ public final class InternalTestCluster extends TestCluster { builder.put(MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING.getKey(), new TimeValue(RandomNumbers.randomIntBetween(random, 10, 30), TimeUnit.SECONDS)); } + // turning on the real memory circuit breaker leads to spurious test failures. As have no full control over heap usage, we + // turn it off for these tests. + builder.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false); + if (random.nextInt(10) == 0) { builder.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getKey(), "noop"); builder.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING.getKey(), "noop");