diff --git a/server/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java b/server/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java index e3b61199e29..6cc818e03ac 100644 --- a/server/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java +++ b/server/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java @@ -62,7 +62,7 @@ public interface CircuitBreaker { String ACCOUNTING = "accounting"; enum Type { - // A regular or child MemoryCircuitBreaker + // A regular or ChildMemoryCircuitBreaker MEMORY, // A special parent-type for the hierarchy breaker service PARENT, diff --git a/server/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java b/server/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java deleted file mode 100644 index bdf210dc420..00000000000 --- a/server/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.common.breaker; - -import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.unit.ByteSizeValue; - -import java.util.concurrent.atomic.AtomicLong; - -/** - * MemoryCircuitBreaker is a circuit breaker that breaks once a - * configurable memory limit has been reached. - */ -public class MemoryCircuitBreaker implements CircuitBreaker { - - private final long memoryBytesLimit; - private final double overheadConstant; - private final AtomicLong used; - private final AtomicLong trippedCount; - private final Logger logger; - - - /** - * Create a circuit breaker that will break if the number of estimated - * bytes grows above the limit. All estimations will be multiplied by - * the given overheadConstant. This breaker starts with 0 bytes used. - * @param limit circuit breaker limit - * @param overheadConstant constant multiplier for byte estimations - */ - public MemoryCircuitBreaker(ByteSizeValue limit, double overheadConstant, Logger logger) { - this(limit, overheadConstant, null, logger); - } - - /** - * Create a circuit breaker that will break if the number of estimated - * bytes grows above the limit. All estimations will be multiplied by - * the given overheadConstant. Uses the given oldBreaker to initialize - * the starting offset. - * @param limit circuit breaker limit - * @param overheadConstant constant multiplier for byte estimations - * @param oldBreaker the previous circuit breaker to inherit the used value from (starting offset) - */ - public MemoryCircuitBreaker(ByteSizeValue limit, double overheadConstant, MemoryCircuitBreaker oldBreaker, Logger logger) { - this.memoryBytesLimit = limit.getBytes(); - this.overheadConstant = overheadConstant; - if (oldBreaker == null) { - this.used = new AtomicLong(0); - this.trippedCount = new AtomicLong(0); - } else { - this.used = oldBreaker.used; - this.trippedCount = oldBreaker.trippedCount; - } - this.logger = logger; - if (logger.isTraceEnabled()) { - logger.trace("Creating MemoryCircuitBreaker with a limit of {} bytes ({}) and a overhead constant of {}", - this.memoryBytesLimit, limit, this.overheadConstant); - } - } - - /** - * Method used to trip the breaker - */ - @Override - public void circuitBreak(String fieldName, long bytesNeeded) throws CircuitBreakingException { - this.trippedCount.incrementAndGet(); - final String message = "[" + getName() + "] Data too large, data for field [" + fieldName + "]" + - " would be [" + bytesNeeded + "/" + new ByteSizeValue(bytesNeeded) + "]" + - ", which is larger than the limit of [" + - memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]"; - logger.debug("{}", message); - throw new CircuitBreakingException(message, bytesNeeded, memoryBytesLimit, Durability.PERMANENT); - } - - /** - * Add a number of bytes, tripping the circuit breaker if the aggregated - * estimates are above the limit. Automatically trips the breaker if the - * memory limit is set to 0. Will never trip the breaker if the limit is - * set < 0, but can still be used to aggregate estimations. - * @param bytes number of bytes to add to the breaker - * @return number of "used" bytes so far - */ - @Override - public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException { - // short-circuit on no data allowed, immediately throwing an exception - if (memoryBytesLimit == 0) { - circuitBreak(label, bytes); - } - - long newUsed; - // If there is no limit (-1), we can optimize a bit by using - // .addAndGet() instead of looping (because we don't have to check a - // limit), which makes the RamAccountingTermsEnum case faster. - if (this.memoryBytesLimit == -1) { - newUsed = this.used.addAndGet(bytes); - if (logger.isTraceEnabled()) { - logger.trace("Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]", - new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed)); - } - return newUsed; - } - - // Otherwise, check the addition and commit the addition, looping if - // there are conflicts. May result in additional logging, but it's - // trace logging and shouldn't be counted on for additions. - long currentUsed; - do { - currentUsed = this.used.get(); - newUsed = currentUsed + bytes; - long newUsedWithOverhead = (long)(newUsed * overheadConstant); - if (logger.isTraceEnabled()) { - logger.trace("Adding [{}][{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]", - new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed), - memoryBytesLimit, new ByteSizeValue(memoryBytesLimit), - newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead)); - } - if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) { - logger.warn("New used memory {} [{}] from field [{}] would be larger than configured breaker: {} [{}], breaking", - newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), label, - memoryBytesLimit, new ByteSizeValue(memoryBytesLimit)); - circuitBreak(label, newUsedWithOverhead); - } - // Attempt to set the new used value, but make sure it hasn't changed - // underneath us, if it has, keep trying until we are able to set it - } while (!this.used.compareAndSet(currentUsed, newUsed)); - - return newUsed; - } - - /** - * Add an exact number of bytes, not checking for tripping the - * circuit breaker. This bypasses the overheadConstant multiplication. - * @param bytes number of bytes to add to the breaker - * @return number of "used" bytes so far - */ - @Override - public long addWithoutBreaking(long bytes) { - long u = used.addAndGet(bytes); - if (logger.isTraceEnabled()) { - logger.trace("Adjusted breaker by [{}] bytes, now [{}]", bytes, u); - } - assert u >= 0 : "Used bytes: [" + u + "] must be >= 0"; - return u; - } - - /** - * @return the number of aggregated "used" bytes so far - */ - @Override - public long getUsed() { - return this.used.get(); - } - - /** - * @return the number of bytes that can be added before the breaker trips - */ - @Override - public long getLimit() { - return this.memoryBytesLimit; - } - - /** - * @return the constant multiplier the breaker uses for aggregations - */ - @Override - public double getOverhead() { - return this.overheadConstant; - } - - /** - * @return the number of times the breaker has been tripped - */ - @Override - public long getTrippedCount() { - return this.trippedCount.get(); - } - - /** - * @return the name of the breaker - */ - @Override - public String getName() { - return FIELDDATA; - } - - @Override - public Durability getDurability() { - return Durability.PERMANENT; - } -} diff --git a/server/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java b/server/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java index 57b388b89c0..40b07d0a15c 100644 --- a/server/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java +++ b/server/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java @@ -27,7 +27,7 @@ import org.elasticsearch.index.fielddata.plain.AbstractIndexFieldData; import java.io.IOException; /** - * {@link TermsEnum} that takes a MemoryCircuitBreaker, increasing the breaker + * {@link TermsEnum} that takes a CircuitBreaker, increasing the breaker * every time {@code .next(...)} is called. Proxies all methods to the original * TermsEnum otherwise. */ diff --git a/server/src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexFieldData.java b/server/src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexFieldData.java index c519c6634a2..5eaaf14f8bc 100644 --- a/server/src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexFieldData.java +++ b/server/src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexFieldData.java @@ -86,7 +86,7 @@ public abstract class AbstractIndexFieldData extends * A {@code PerValueEstimator} is a sub-class that can be used to estimate * the memory overhead for loading the data. Each field data * implementation should implement its own {@code PerValueEstimator} if it - * intends to take advantage of the MemoryCircuitBreaker. + * intends to take advantage of the CircuitBreaker. *

* Note that the .beforeLoad(...) and .afterLoad(...) methods must be * manually called. diff --git a/server/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java b/server/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java deleted file mode 100644 index 31a84423db9..00000000000 --- a/server/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.breaker; - -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.test.ESTestCase; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; - -/** - * Tests for the Memory Aggregating Circuit Breaker - */ -public class MemoryCircuitBreakerTests extends ESTestCase { - public void testThreadedUpdatesToBreaker() throws Exception { - final int NUM_THREADS = scaledRandomIntBetween(3, 15); - final int BYTES_PER_THREAD = scaledRandomIntBetween(500, 4500); - final Thread[] threads = new Thread[NUM_THREADS]; - final AtomicBoolean tripped = new AtomicBoolean(false); - final AtomicReference lastException = new AtomicReference<>(null); - - final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue((BYTES_PER_THREAD * NUM_THREADS) - 1), 1.0, logger); - - for (int i = 0; i < NUM_THREADS; i++) { - threads[i] = new Thread(() -> { - for (int j = 0; j < BYTES_PER_THREAD; j++) { - try { - breaker.addEstimateBytesAndMaybeBreak(1L, "test"); - } catch (CircuitBreakingException e) { - if (tripped.get()) { - assertThat("tripped too many times", true, equalTo(false)); - } else { - assertThat(tripped.compareAndSet(false, true), equalTo(true)); - } - } catch (Exception e) { - lastException.set(e); - } - } - }); - - threads[i].start(); - } - - for (Thread t : threads) { - t.join(); - } - - assertThat("no other exceptions were thrown", lastException.get(), equalTo(null)); - assertThat("breaker was tripped", tripped.get(), equalTo(true)); - assertThat("breaker was tripped at least once", breaker.getTrippedCount(), greaterThanOrEqualTo(1L)); - } - - public void testConstantFactor() throws Exception { - final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue(15), 1.6, logger); - String field = "myfield"; - - // add only 7 bytes - breaker.addWithoutBreaking(7); - - try { - // this won't actually add it because it trips the breaker - breaker.addEstimateBytesAndMaybeBreak(3, field); - fail("should never reach this"); - } catch (CircuitBreakingException cbe) { - } - - // shouldn't throw an exception - breaker.addEstimateBytesAndMaybeBreak(2, field); - - assertThat(breaker.getUsed(), equalTo(9L)); - - // adding 3 more bytes (now at 12) - breaker.addWithoutBreaking(3); - - try { - // Adding no bytes still breaks - breaker.addEstimateBytesAndMaybeBreak(0, field); - fail("should never reach this"); - } catch (CircuitBreakingException cbe) { - assertThat("breaker was tripped exactly twice", breaker.getTrippedCount(), equalTo(2L)); - - long newUsed = (long)(breaker.getUsed() * breaker.getOverhead()); - assertThat(cbe.getMessage().contains("would be [" + newUsed + "/"), equalTo(true)); - assertThat(cbe.getMessage().contains("field [" + field + "]"), equalTo(true)); - } - } -} diff --git a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java index 2159d8ed976..a4411930f92 100644 --- a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java @@ -23,7 +23,6 @@ package org.elasticsearch.indices.breaker; import org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; -import org.elasticsearch.common.breaker.MemoryCircuitBreaker; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -180,11 +179,11 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { .build(); try (CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings, new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { - CircuitBreaker requestCircuitBreaker = service.getBreaker(MemoryCircuitBreaker.REQUEST); - CircuitBreaker fieldDataCircuitBreaker = service.getBreaker(MemoryCircuitBreaker.FIELDDATA); + CircuitBreaker requestCircuitBreaker = service.getBreaker(CircuitBreaker.REQUEST); + CircuitBreaker fieldDataCircuitBreaker = service.getBreaker(CircuitBreaker.FIELDDATA); assertEquals(new ByteSizeValue(200, ByteSizeUnit.MB).getBytes(), - service.stats().getStats(MemoryCircuitBreaker.PARENT).getLimit()); + service.stats().getStats(CircuitBreaker.PARENT).getLimit()); assertEquals(new ByteSizeValue(150, ByteSizeUnit.MB).getBytes(), requestCircuitBreaker.getLimit()); assertEquals(new ByteSizeValue(150, ByteSizeUnit.MB).getBytes(), fieldDataCircuitBreaker.getLimit()); @@ -266,8 +265,8 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { .build(); try (CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings, new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { - CircuitBreaker requestCircuitBreaker = service.getBreaker(MemoryCircuitBreaker.REQUEST); - CircuitBreaker fieldDataCircuitBreaker = service.getBreaker(MemoryCircuitBreaker.FIELDDATA); + CircuitBreaker requestCircuitBreaker = service.getBreaker(CircuitBreaker.REQUEST); + CircuitBreaker fieldDataCircuitBreaker = service.getBreaker(CircuitBreaker.FIELDDATA); CircuitBreaker.Durability expectedDurability; if (randomBoolean()) {