diff --git a/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java b/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java index 4f9c4e458c6..0671091d503 100644 --- a/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java +++ b/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java @@ -28,10 +28,38 @@ import java.util.Locale; */ public interface CircuitBreaker { + /** + * The parent breaker is a sum of all the following breakers combined. With + * this we allow a single breaker to have a significant amount of memory + * available while still having a "total" limit for all breakers. Note that + * it's not a "real" breaker in that it cannot be added to or subtracted + * from by itself. + */ String PARENT = "parent"; + /** + * The fielddata breaker tracks data used for fielddata (on fields) as well + * as the id cached used for parent/child queries. + */ String FIELDDATA = "fielddata"; + /** + * The request breaker tracks memory used for particular requests. This + * includes allocations for things like the cardinality aggregation, and + * accounting for the number of buckets used in an aggregation request. + * Generally the amounts added to this breaker are released after a request + * is finished. + */ String REQUEST = "request"; + /** + * The in-flight request breaker tracks bytes allocated for reading and + * writing requests on the network layer. + */ String IN_FLIGHT_REQUESTS = "in_flight_requests"; + /** + * The accounting breaker tracks things held in memory that is independent + * of the request lifecycle. This includes memory used by Lucene for + * segments. + */ + String ACCOUNTING = "accounting"; enum Type { // A regular or child MemoryCircuitBreaker diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 12a95a688ae..2bea2a59e16 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -258,6 +258,8 @@ public final class ClusterSettings extends AbstractScopedSettings { HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING, HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, + HierarchyCircuitBreakerService.ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING, + HierarchyCircuitBreakerService.ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING, ClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING, ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING, diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 78489965e39..2d5a1dda464 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -129,6 +129,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final BigArrays bigArrays; private final ScriptService scriptService; private final Client client; + private final CircuitBreakerService circuitBreakerService; private Supplier indexSortSupplier; public IndexService( @@ -158,6 +159,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust this.xContentRegistry = xContentRegistry; this.similarityService = similarityService; this.namedWriteableRegistry = namedWriteableRegistry; + this.circuitBreakerService = circuitBreakerService; this.mapperService = new MapperService(indexSettings, registry.build(indexSettings), xContentRegistry, similarityService, mapperRegistry, // we parse all percolator queries as they would be parsed on shard 0 @@ -380,7 +382,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier, indexCache, mapperService, similarityService, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer, - searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId)); + searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId), + circuitBreakerService); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index fbc87f2279b..f923abc1a6c 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -39,6 +39,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.IndexingMemoryController; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -73,6 +74,8 @@ public final class EngineConfig { private final Sort indexSort; private final boolean forceNewHistoryUUID; private final TranslogRecoveryRunner translogRecoveryRunner; + @Nullable + private final CircuitBreakerService circuitBreakerService; /** * Index setting to change the low level lucene codec used for writing new segments. @@ -118,7 +121,7 @@ public final class EngineConfig { QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter, List refreshListeners, Sort indexSort, - TranslogRecoveryRunner translogRecoveryRunner) { + TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService) { if (openMode == null) { throw new IllegalArgumentException("openMode must not be null"); } @@ -147,6 +150,7 @@ public final class EngineConfig { this.refreshListeners = refreshListeners; this.indexSort = indexSort; this.translogRecoveryRunner = translogRecoveryRunner; + this.circuitBreakerService = circuitBreakerService; } /** @@ -358,4 +362,12 @@ public final class EngineConfig { public Sort getIndexSort() { return indexSort; } + + /** + * Returns the circuit breaker service for this engine, or {@code null} if none is to be used. + */ + @Nullable + public CircuitBreakerService getCircuitBreakerService() { + return this.circuitBreakerService; + } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 82461ca4a99..e431bfb7a5b 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -531,7 +531,8 @@ public class InternalEngine extends Engine { try { try { final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId); - internalSearcherManager = new SearcherManager(directoryReader, new SearcherFactory()); + internalSearcherManager = new SearcherManager(directoryReader, + new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService())); lastCommittedSegmentInfos = readLastCommittedSegmentInfos(internalSearcherManager, store); ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager, externalSearcherFactory); diff --git a/core/src/main/java/org/elasticsearch/index/engine/RamAccountingSearcherFactory.java b/core/src/main/java/org/elasticsearch/index/engine/RamAccountingSearcherFactory.java new file mode 100644 index 00000000000..7972d426fba --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/engine/RamAccountingSearcherFactory.java @@ -0,0 +1,84 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.SearcherFactory; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.indices.breaker.CircuitBreakerService; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Searcher factory extending {@link EngineSearcherFactory} that tracks the + * amount of memory used by segments in the accounting circuit breaker. + */ +final class RamAccountingSearcherFactory extends SearcherFactory { + + private final CircuitBreakerService breakerService; + + RamAccountingSearcherFactory(CircuitBreakerService breakerService) { + this.breakerService = breakerService; + } + + @Override + public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException { + final CircuitBreaker breaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING); + + // Construct a list of the previous segment readers, we only want to track memory used + // by new readers, so these will be exempted from the circuit breaking accounting. + // + // The Core CacheKey is used as the key for the set so that deletions still keep the correct + // accounting, as using the Reader or Reader's CacheKey causes incorrect accounting. + final Set prevReaders; + if (previousReader == null) { + prevReaders = Collections.emptySet(); + } else { + final List previousReaderLeaves = previousReader.leaves(); + prevReaders = new HashSet<>(previousReaderLeaves.size()); + for (LeafReaderContext lrc : previousReaderLeaves) { + prevReaders.add(Lucene.segmentReader(lrc.reader()).getCoreCacheHelper().getKey()); + } + } + + for (LeafReaderContext lrc : reader.leaves()) { + final SegmentReader segmentReader = Lucene.segmentReader(lrc.reader()); + // don't add the segment's memory unless it is not referenced by the previous reader + // (only new segments) + if (prevReaders.contains(segmentReader.getCoreCacheHelper().getKey()) == false) { + final long ramBytesUsed = segmentReader.ramBytesUsed(); + // add the segment memory to the breaker (non-breaking) + breaker.addWithoutBreaking(ramBytesUsed); + // and register a listener for when the segment is closed to decrement the + // breaker accounting + segmentReader.getCoreCacheHelper().addClosedListener(k -> breaker.addWithoutBreaking(-ramBytesUsed)); + } + } + return super.newSearcher(reader, previousReader); + } +} diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 0f554b6ace1..1dc28915d09 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -125,6 +125,7 @@ import org.elasticsearch.index.warmer.WarmerStats; import org.elasticsearch.indices.IndexingMemoryController; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.TypeMissingException; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoveryFailedException; @@ -187,6 +188,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final IndexEventListener indexEventListener; private final QueryCachingPolicy cachingPolicy; private final Supplier indexSortSupplier; + // Package visible for testing + final CircuitBreakerService circuitBreakerService; private final SearchOperationListener searchOperationListener; @@ -258,7 +261,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl Engine.Warmer warmer, List searchOperationListener, List listeners, - Runnable globalCheckpointSyncer) throws IOException { + Runnable globalCheckpointSyncer, + CircuitBreakerService circuitBreakerService) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); this.shardRouting = shardRouting; @@ -289,6 +293,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings); state = IndexShardState.CREATED; this.path = path; + this.circuitBreakerService = circuitBreakerService; /* create engine config */ logger.debug("state: [CREATED]"); @@ -2181,7 +2186,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl indexCache.query(), cachingPolicy, forceNewHistoryUUID, translogConfig, IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort, - this::runTranslogRecovery); + this::runTranslogRecovery, circuitBreakerService); } /** diff --git a/core/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java b/core/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java index b8ec92ba153..9ea8a3df294 100644 --- a/core/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java +++ b/core/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java @@ -63,6 +63,13 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { public static final Setting REQUEST_CIRCUIT_BREAKER_TYPE_SETTING = new Setting<>("indices.breaker.request.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope); + public static final Setting ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING = + Setting.memorySizeSetting("indices.breaker.accounting.limit", "100%", Property.NodeScope); + public static final Setting ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING = + Setting.doubleSetting("indices.breaker.accounting.overhead", 1.0d, 0.0d, Property.NodeScope); + public static final Setting ACCOUNTING_CIRCUIT_BREAKER_TYPE_SETTING = + new Setting<>("indices.breaker.accounting.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope); + public static final Setting IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.memorySizeSetting("network.breaker.inflight_requests.limit", "100%", Property.Dynamic, Property.NodeScope); public static final Setting IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING = @@ -74,6 +81,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { private volatile BreakerSettings fielddataSettings; private volatile BreakerSettings inFlightRequestsSettings; private volatile BreakerSettings requestSettings; + private volatile BreakerSettings accountingSettings; // Tripped count for when redistribution was attempted but wasn't successful private final AtomicLong parentTripCount = new AtomicLong(0); @@ -98,6 +106,12 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.get(settings) ); + this.accountingSettings = new BreakerSettings(CircuitBreaker.ACCOUNTING, + ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(), + ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings), + ACCOUNTING_CIRCUIT_BREAKER_TYPE_SETTING.get(settings) + ); + this.parentSettings = new BreakerSettings(CircuitBreaker.PARENT, TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(), 1.0, CircuitBreaker.Type.PARENT); @@ -109,6 +123,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { registerBreaker(this.requestSettings); registerBreaker(this.fielddataSettings); registerBreaker(this.inFlightRequestsSettings); + registerBreaker(this.accountingSettings); clusterSettings.addSettingsUpdateConsumer(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, this::setTotalCircuitBreakerLimit, this::validateTotalCircuitBreakerLimit); clusterSettings.addSettingsUpdateConsumer(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setFieldDataBreakerLimit); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index c40923e3c7c..026a01a23c3 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -116,6 +116,7 @@ import org.elasticsearch.index.store.DirectoryUtils; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.IndexSettingsModule; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; @@ -2546,7 +2547,7 @@ public class InternalEngineTests extends EngineTestCase { threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, TimeValue.timeValueMinutes(5), - config.getRefreshListeners(), null, config.getTranslogRecoveryRunner()); + config.getRefreshListeners(), null, config.getTranslogRecoveryRunner(), new NoneCircuitBreakerService()); try { InternalEngine internalEngine = new InternalEngine(brokenConfig); @@ -2600,7 +2601,7 @@ public class InternalEngineTests extends EngineTestCase { threadPool, indexSettings, null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, config.getTranslogConfig(), TimeValue.timeValueMinutes(5), - config.getRefreshListeners(), null, config.getTranslogRecoveryRunner()); + config.getRefreshListeners(), null, config.getTranslogRecoveryRunner(), new NoneCircuitBreakerService()); engine = new InternalEngine(newConfig); if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { engine.recoverFromTranslog(); @@ -2630,7 +2631,7 @@ public class InternalEngineTests extends EngineTestCase { threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), true, config.getTranslogConfig(), TimeValue.timeValueMinutes(5), - config.getRefreshListeners(), null, config.getTranslogRecoveryRunner()); + config.getRefreshListeners(), null, config.getTranslogRecoveryRunner(), new NoneCircuitBreakerService()); engine = new InternalEngine(newConfig); if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { engine.recoverFromTranslog(); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 7c38b7c211f..f0e47b69555 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -23,8 +23,11 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; @@ -40,6 +43,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; @@ -55,16 +59,21 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerStats; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.junit.annotations.TestLogging; import java.io.IOException; import java.io.UncheckedIOException; @@ -94,8 +103,10 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.containsString; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; public class IndexShardIT extends ESSingleNodeTestCase { @@ -495,7 +506,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { } } }; - final IndexShard newShard = newIndexShard(indexService, shard, wrapper, listener); + final IndexShard newShard = newIndexShard(indexService, shard, wrapper, getInstanceFromNode(CircuitBreakerService.class), listener); shardRef.set(newShard); recoverShard(newShard); @@ -506,6 +517,65 @@ public class IndexShardIT extends ESSingleNodeTestCase { } } + /** Check that the accounting breaker correctly matches the segments API for memory usage */ + private void checkAccountingBreaker() { + CircuitBreakerService breakerService = getInstanceFromNode(CircuitBreakerService.class); + CircuitBreaker acctBreaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING); + long usedMem = acctBreaker.getUsed(); + assertThat(usedMem, greaterThan(0L)); + NodesStatsResponse response = client().admin().cluster().prepareNodesStats().setIndices(true).setBreaker(true).get(); + NodeStats stats = response.getNodes().get(0); + assertNotNull(stats); + SegmentsStats segmentsStats = stats.getIndices().getSegments(); + CircuitBreakerStats breakerStats = stats.getBreaker().getStats(CircuitBreaker.ACCOUNTING); + assertEquals(usedMem, segmentsStats.getMemoryInBytes()); + assertEquals(usedMem, breakerStats.getEstimated()); + } + + public void testCircuitBreakerIncrementedByIndexShard() throws Exception { + client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder().put("network.breaker.inflight_requests.overhead", 0.0)).get(); + + // Generate a couple of segments + client().prepareIndex("test", "doc", "1").setSource("{\"foo\":\"" + randomAlphaOfLength(100) + "\"}", XContentType.JSON) + .setRefreshPolicy(IMMEDIATE).get(); + // Use routing so 2 documents are guarenteed to be on the same shard + String routing = randomAlphaOfLength(5); + client().prepareIndex("test", "doc", "2").setSource("{\"foo\":\"" + randomAlphaOfLength(100) + "\"}", XContentType.JSON) + .setRefreshPolicy(IMMEDIATE).setRouting(routing).get(); + client().prepareIndex("test", "doc", "3").setSource("{\"foo\":\"" + randomAlphaOfLength(100) + "\"}", XContentType.JSON) + .setRefreshPolicy(IMMEDIATE).setRouting(routing).get(); + + checkAccountingBreaker(); + // Test that force merging causes the breaker to be correctly adjusted + logger.info("--> force merging to a single segment"); + client().admin().indices().prepareForceMerge("test").setMaxNumSegments(1).setFlush(randomBoolean()).get(); + client().admin().indices().prepareRefresh().get(); + checkAccountingBreaker(); + + client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder().put("indices.breaker.total.limit", "1kb")).get(); + + // Test that we're now above the parent limit due to the segments + Exception e = expectThrows(Exception.class, + () -> client().prepareSearch("test").addAggregation(AggregationBuilders.terms("foo_terms").field("foo.keyword")).get()); + logger.info("--> got: {}", ExceptionsHelper.detailedMessage(e)); + assertThat(ExceptionsHelper.detailedMessage(e), containsString("[parent] Data too large, data for []")); + + client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder() + .putNull("indices.breaker.total.limit") + .putNull("network.breaker.inflight_requests.overhead")).get(); + + // Test that deleting the index causes the breaker to correctly be decremented + logger.info("--> deleting index"); + client().admin().indices().prepareDelete("test").get(); + + // Accounting breaker should now be 0 + CircuitBreakerService breakerService = getInstanceFromNode(CircuitBreakerService.class); + CircuitBreaker acctBreaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING); + assertThat(acctBreaker.getUsed(), equalTo(0L)); + } public static final IndexShard recoverShard(IndexShard newShard) throws IOException { DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); @@ -516,12 +586,12 @@ public class IndexShardIT extends ESSingleNodeTestCase { } public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, - IndexingOperationListener... listeners) throws IOException { + CircuitBreakerService cbs, IndexingOperationListener... listeners) throws IOException { ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry()); IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.getIndexSortSupplier(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, - indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners), () -> {}); + indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners), () -> {}, cbs); return newShard; } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b3a0f4b88de..96bcb9382ee 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -32,6 +32,7 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Constants; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; @@ -55,6 +56,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -76,6 +78,8 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.fielddata.FieldDataStats; import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.fielddata.IndexFieldDataCache; @@ -155,6 +159,7 @@ import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -2732,4 +2737,148 @@ public class IndexShardTests extends IndexShardTestCase { latch1.await(); closeShards(primary); } + + public void testSegmentMemoryTrackedInBreaker() throws Exception { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(primary); + indexDoc(primary, "test", "0", "{\"foo\" : \"foo\"}"); + primary.refresh("forced refresh"); + + SegmentsStats ss = primary.segmentStats(randomBoolean()); + CircuitBreaker breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING); + assertThat(ss.getMemoryInBytes(), equalTo(breaker.getUsed())); + final long preRefreshBytes = ss.getMemoryInBytes(); + + indexDoc(primary, "test", "1", "{\"foo\" : \"bar\"}"); + indexDoc(primary, "test", "2", "{\"foo\" : \"baz\"}"); + indexDoc(primary, "test", "3", "{\"foo\" : \"eggplant\"}"); + + ss = primary.segmentStats(randomBoolean()); + breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING); + assertThat(preRefreshBytes, equalTo(breaker.getUsed())); + + primary.refresh("refresh"); + + ss = primary.segmentStats(randomBoolean()); + breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING); + assertThat(breaker.getUsed(), equalTo(ss.getMemoryInBytes())); + assertThat(breaker.getUsed(), greaterThan(preRefreshBytes)); + + indexDoc(primary, "test", "4", "{\"foo\": \"potato\"}"); + // Forces a refresh with the INTERNAL scope + ((InternalEngine) primary.getEngine()).writeIndexingBuffer(); + + ss = primary.segmentStats(randomBoolean()); + breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING); + assertThat(breaker.getUsed(), equalTo(ss.getMemoryInBytes())); + assertThat(breaker.getUsed(), greaterThan(preRefreshBytes)); + final long postRefreshBytes = ss.getMemoryInBytes(); + + // Deleting a doc causes its memory to be freed from the breaker + deleteDoc(primary, "test", "0"); + primary.refresh("force refresh"); + + ss = primary.segmentStats(randomBoolean()); + breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING); + assertThat(breaker.getUsed(), lessThan(postRefreshBytes)); + + closeShards(primary); + + breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING); + assertThat(breaker.getUsed(), equalTo(0L)); + } + + public void testSegmentMemoryTrackedWithRandomSearchers() throws Exception { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(primary); + + int threadCount = randomIntBetween(2, 6); + List threads = new ArrayList<>(threadCount); + int iterations = randomIntBetween(50, 100); + List searchers = Collections.synchronizedList(new ArrayList<>()); + + logger.info("--> running with {} threads and {} iterations each", threadCount, iterations); + for (int threadId = 0; threadId < threadCount; threadId++) { + final String threadName = "thread-" + threadId; + Runnable r = () -> { + for (int i = 0; i < iterations; i++) { + try { + if (randomBoolean()) { + String id = "id-" + threadName + "-" + i; + logger.debug("--> {} indexing {}", threadName, id); + indexDoc(primary, "test", id, "{\"foo\" : \"" + randomAlphaOfLength(10) + "\"}"); + } + + if (randomBoolean() && i > 10) { + String id = "id-" + threadName + "-" + randomIntBetween(0, i - 1); + logger.debug("--> {}, deleting {}", threadName, id); + deleteDoc(primary, "test", id); + } + + if (randomBoolean()) { + logger.debug("--> {} refreshing", threadName); + primary.refresh("forced refresh"); + } + + if (randomBoolean()) { + String searcherName = "searcher-" + threadName + "-" + i; + logger.debug("--> {} acquiring new searcher {}", threadName, searcherName); + // Acquire a new searcher, adding it to the list + searchers.add(primary.acquireSearcher(searcherName)); + } + + if (randomBoolean() && searchers.size() > 1) { + // Close one of the searchers at random + Engine.Searcher searcher = searchers.remove(0); + logger.debug("--> {} closing searcher {}", threadName, searcher.source()); + IOUtils.close(searcher); + } + } catch (Exception e) { + logger.warn("--> got exception: ", e); + fail("got an exception we didn't expect"); + } + } + + }; + threads.add(new Thread(r, threadName)); + } + threads.stream().forEach(t -> t.start()); + + for (Thread t : threads) { + t.join(); + } + + // Close remaining searchers + IOUtils.close(searchers); + + SegmentsStats ss = primary.segmentStats(randomBoolean()); + CircuitBreaker breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING); + long segmentMem = ss.getMemoryInBytes(); + long breakerMem = breaker.getUsed(); + logger.info("--> comparing segmentMem: {} - breaker: {} => {}", segmentMem, breakerMem, segmentMem == breakerMem); + assertThat(segmentMem, equalTo(breakerMem)); + + // Close shard + closeShards(primary); + + // Check that the breaker was successfully reset to 0, meaning that all the accounting was correctly applied + breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING); + assertThat(breaker.getUsed(), equalTo(0L)); + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 53ced098c04..e3158a21853 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -51,6 +51,7 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; @@ -122,7 +123,7 @@ public class RefreshListenersTests extends ESTestCase { EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, allocationId, threadPool, indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, - TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null, null); + TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null, null, new NoneCircuitBreakerService()); engine = new InternalEngine(config); listeners.setTranslog(engine.getTranslog()); } diff --git a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index 2a490c1dcf9..c079ebea840 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardIT; import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -440,7 +441,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { shard.writeIndexingBuffer(); } }; - final IndexShard newShard = IndexShardIT.newIndexShard(indexService, shard, wrapper, imc); + final IndexShard newShard = IndexShardIT.newIndexShard(indexService, shard, wrapper, new NoneCircuitBreakerService(), imc); shardRef.set(newShard); try { assertEquals(0, imc.availableShards().size()); diff --git a/docs/reference/modules/indices/circuit_breaker.asciidoc b/docs/reference/modules/indices/circuit_breaker.asciidoc index 7792e10c684..857f54132cc 100644 --- a/docs/reference/modules/indices/circuit_breaker.asciidoc +++ b/docs/reference/modules/indices/circuit_breaker.asciidoc @@ -72,6 +72,24 @@ memory on a node. The memory usage is based on the content length of the request A constant that all in flight requests estimations are multiplied with to determine a final estimation. Defaults to 1 +[[accounting-circuit-breaker]] +[float] +==== Accounting requests circuit breaker + +The in flight requests circuit breaker allows Elasticsearch to limit the memory +usage of things held in memory that are not released when a request is +completed. This includes things like the Lucene segment memory. + +`network.breaker.accounting.limit`:: + + Limit for accounting breaker, defaults to 100% of JVM heap. This means that it is bound + by the limit configured for the parent circuit breaker. + +`network.breaker.accounting.overhead`:: + + A constant that all accounting estimations are multiplied with to determine a + final estimation. Defaults to 1 + [[script-compilation-circuit-breaker]] [float] ==== Script compilation circuit breaker diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 5c2ef977b16..9ba6f64d74c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -68,6 +68,7 @@ import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; @@ -162,7 +163,7 @@ public abstract class EngineTestCase extends ESTestCase { config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), - config.getIndexSort(), config.getTranslogRecoveryRunner()); + config.getIndexSort(), config.getTranslogRecoveryRunner(), config.getCircuitBreakerService()); } @Override @@ -401,7 +402,7 @@ public abstract class EngineTestCase extends ESTestCase { EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, - TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler); + TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler, new NoneCircuitBreakerService()); return config; } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index a6cb3ee3b95..4c3e3fb7b48 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -41,6 +41,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.util.BigArrays; @@ -64,6 +65,9 @@ import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoverySourceHandler; @@ -289,9 +293,12 @@ public abstract class IndexShardTestCase extends ESTestCase { }; final Engine.Warmer warmer = searcher -> { }; + ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(nodeSettings, clusterSettings); indexShard = new IndexShard(routing, indexSettings, shardPath, store, () -> null, indexCache, mapperService, similarityService, engineFactory, indexEventListener, indexSearcherWrapper, threadPool, - BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer); + BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer, + breakerService); success = true; } finally { if (success == false) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/ExternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/ExternalTestCluster.java index f8d9c27fd4d..5eb34d96d69 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ExternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ExternalTestCluster.java @@ -168,6 +168,8 @@ public final class ExternalTestCluster extends TestCluster { for (NodeStats stats : nodeStats.getNodes()) { assertThat("Fielddata breaker not reset to 0 on node: " + stats.getNode(), stats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getEstimated(), equalTo(0L)); + assertThat("Accounting breaker not reset to 0 on node: " + stats.getNode(), + stats.getBreaker().getStats(CircuitBreaker.ACCOUNTING).getEstimated(), equalTo(0L)); // ExternalTestCluster does not check the request breaker, // because checking it requires a network request, which in // turn increments the breaker, making it non-0 diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 9ad0afdf6a4..ea9b17f10e3 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -2050,6 +2050,8 @@ public final class InternalTestCluster extends TestCluster { final CircuitBreakerService breakerService = getInstanceFromNode(CircuitBreakerService.class, nodeAndClient.node); CircuitBreaker fdBreaker = breakerService.getBreaker(CircuitBreaker.FIELDDATA); assertThat("Fielddata breaker not reset to 0 on node: " + name, fdBreaker.getUsed(), equalTo(0L)); + CircuitBreaker acctBreaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING); + assertThat("Accounting breaker not reset to 0 on node: " + name, acctBreaker.getUsed(), equalTo(0L)); // Anything that uses transport or HTTP can increase the // request breaker (because they use bigarrays), because of // that the breaker can sometimes be incremented from ping