Add accounting circuit breaker and track segment memory usage (#27116)

* Add accounting circuit breaker and track segment memory usage

This commit adds a new circuit breaker "accounting" that is used for tracking
the memory usage of non-request-tied memory users. It also adds tracking for the
amount of Lucene segment memory used by a shard as a user of the new circuit
breaker.

The Lucene segment memory is updated when the shard refreshes, and removed when
the shard relocates away from a node or is deleted. It should also be noted that
all tracking for segment memory uses `addWithoutBreaking` so as not to fail the
shard if a limit is reached.

The `accounting` breaker has a default limit of 100% and will contribute to the
parent breaker limit.

Resolves #27044
This commit is contained in:
Lee Hinman 2017-12-01 07:59:45 -07:00 committed by GitHub
parent 5060007d20
commit 623d3700f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 418 additions and 16 deletions

View File

@ -28,10 +28,38 @@ import java.util.Locale;
*/
public interface CircuitBreaker {
/**
* The parent breaker is a sum of all the following breakers combined. With
* this we allow a single breaker to have a significant amount of memory
* available while still having a "total" limit for all breakers. Note that
* it's not a "real" breaker in that it cannot be added to or subtracted
* from by itself.
*/
String PARENT = "parent";
/**
* The fielddata breaker tracks data used for fielddata (on fields) as well
* as the id cached used for parent/child queries.
*/
String FIELDDATA = "fielddata";
/**
* The request breaker tracks memory used for particular requests. This
* includes allocations for things like the cardinality aggregation, and
* accounting for the number of buckets used in an aggregation request.
* Generally the amounts added to this breaker are released after a request
* is finished.
*/
String REQUEST = "request";
/**
* The in-flight request breaker tracks bytes allocated for reading and
* writing requests on the network layer.
*/
String IN_FLIGHT_REQUESTS = "in_flight_requests";
/**
* The accounting breaker tracks things held in memory that is independent
* of the request lifecycle. This includes memory used by Lucene for
* segments.
*/
String ACCOUNTING = "accounting";
enum Type {
// A regular or child MemoryCircuitBreaker

View File

@ -258,6 +258,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING,
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING,
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING,
HierarchyCircuitBreakerService.ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING,
HierarchyCircuitBreakerService.ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING,
ClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,

View File

@ -129,6 +129,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final BigArrays bigArrays;
private final ScriptService scriptService;
private final Client client;
private final CircuitBreakerService circuitBreakerService;
private Supplier<Sort> indexSortSupplier;
public IndexService(
@ -158,6 +159,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
this.xContentRegistry = xContentRegistry;
this.similarityService = similarityService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.circuitBreakerService = circuitBreakerService;
this.mapperService = new MapperService(indexSettings, registry.build(indexSettings), xContentRegistry, similarityService,
mapperRegistry,
// we parse all percolator queries as they would be parsed on shard 0
@ -380,7 +382,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier,
indexCache, mapperService, similarityService, engineFactory,
eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId));
searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId),
circuitBreakerService);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();

View File

@ -39,6 +39,7 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
@ -73,6 +74,8 @@ public final class EngineConfig {
private final Sort indexSort;
private final boolean forceNewHistoryUUID;
private final TranslogRecoveryRunner translogRecoveryRunner;
@Nullable
private final CircuitBreakerService circuitBreakerService;
/**
* Index setting to change the low level lucene codec used for writing new segments.
@ -118,7 +121,7 @@ public final class EngineConfig {
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> refreshListeners, Sort indexSort,
TranslogRecoveryRunner translogRecoveryRunner) {
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
}
@ -147,6 +150,7 @@ public final class EngineConfig {
this.refreshListeners = refreshListeners;
this.indexSort = indexSort;
this.translogRecoveryRunner = translogRecoveryRunner;
this.circuitBreakerService = circuitBreakerService;
}
/**
@ -358,4 +362,12 @@ public final class EngineConfig {
public Sort getIndexSort() {
return indexSort;
}
/**
* Returns the circuit breaker service for this engine, or {@code null} if none is to be used.
*/
@Nullable
public CircuitBreakerService getCircuitBreakerService() {
return this.circuitBreakerService;
}
}

View File

@ -531,7 +531,8 @@ public class InternalEngine extends Engine {
try {
try {
final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
internalSearcherManager = new SearcherManager(directoryReader, new SearcherFactory());
internalSearcherManager = new SearcherManager(directoryReader,
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(internalSearcherManager, store);
ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager,
externalSearcherFactory);

View File

@ -0,0 +1,84 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherFactory;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Searcher factory extending {@link EngineSearcherFactory} that tracks the
* amount of memory used by segments in the accounting circuit breaker.
*/
final class RamAccountingSearcherFactory extends SearcherFactory {
private final CircuitBreakerService breakerService;
RamAccountingSearcherFactory(CircuitBreakerService breakerService) {
this.breakerService = breakerService;
}
@Override
public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException {
final CircuitBreaker breaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING);
// Construct a list of the previous segment readers, we only want to track memory used
// by new readers, so these will be exempted from the circuit breaking accounting.
//
// The Core CacheKey is used as the key for the set so that deletions still keep the correct
// accounting, as using the Reader or Reader's CacheKey causes incorrect accounting.
final Set<IndexReader.CacheKey> prevReaders;
if (previousReader == null) {
prevReaders = Collections.emptySet();
} else {
final List<LeafReaderContext> previousReaderLeaves = previousReader.leaves();
prevReaders = new HashSet<>(previousReaderLeaves.size());
for (LeafReaderContext lrc : previousReaderLeaves) {
prevReaders.add(Lucene.segmentReader(lrc.reader()).getCoreCacheHelper().getKey());
}
}
for (LeafReaderContext lrc : reader.leaves()) {
final SegmentReader segmentReader = Lucene.segmentReader(lrc.reader());
// don't add the segment's memory unless it is not referenced by the previous reader
// (only new segments)
if (prevReaders.contains(segmentReader.getCoreCacheHelper().getKey()) == false) {
final long ramBytesUsed = segmentReader.ramBytesUsed();
// add the segment memory to the breaker (non-breaking)
breaker.addWithoutBreaking(ramBytesUsed);
// and register a listener for when the segment is closed to decrement the
// breaker accounting
segmentReader.getCoreCacheHelper().addClosedListener(k -> breaker.addWithoutBreaking(-ramBytesUsed));
}
}
return super.newSearcher(reader, previousReader);
}
}

View File

@ -125,6 +125,7 @@ import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.TypeMissingException;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
@ -187,6 +188,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final IndexEventListener indexEventListener;
private final QueryCachingPolicy cachingPolicy;
private final Supplier<Sort> indexSortSupplier;
// Package visible for testing
final CircuitBreakerService circuitBreakerService;
private final SearchOperationListener searchOperationListener;
@ -258,7 +261,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
Engine.Warmer warmer,
List<SearchOperationListener> searchOperationListener,
List<IndexingOperationListener> listeners,
Runnable globalCheckpointSyncer) throws IOException {
Runnable globalCheckpointSyncer,
CircuitBreakerService circuitBreakerService) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
this.shardRouting = shardRouting;
@ -289,6 +293,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings);
state = IndexShardState.CREATED;
this.path = path;
this.circuitBreakerService = circuitBreakerService;
/* create engine config */
logger.debug("state: [CREATED]");
@ -2181,7 +2186,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
indexCache.query(), cachingPolicy, forceNewHistoryUUID, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort,
this::runTranslogRecovery);
this::runTranslogRecovery, circuitBreakerService);
}
/**

View File

@ -63,6 +63,13 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
public static final Setting<CircuitBreaker.Type> REQUEST_CIRCUIT_BREAKER_TYPE_SETTING =
new Setting<>("indices.breaker.request.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope);
public static final Setting<ByteSizeValue> ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING =
Setting.memorySizeSetting("indices.breaker.accounting.limit", "100%", Property.NodeScope);
public static final Setting<Double> ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING =
Setting.doubleSetting("indices.breaker.accounting.overhead", 1.0d, 0.0d, Property.NodeScope);
public static final Setting<CircuitBreaker.Type> ACCOUNTING_CIRCUIT_BREAKER_TYPE_SETTING =
new Setting<>("indices.breaker.accounting.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope);
public static final Setting<ByteSizeValue> IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING =
Setting.memorySizeSetting("network.breaker.inflight_requests.limit", "100%", Property.Dynamic, Property.NodeScope);
public static final Setting<Double> IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING =
@ -74,6 +81,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
private volatile BreakerSettings fielddataSettings;
private volatile BreakerSettings inFlightRequestsSettings;
private volatile BreakerSettings requestSettings;
private volatile BreakerSettings accountingSettings;
// Tripped count for when redistribution was attempted but wasn't successful
private final AtomicLong parentTripCount = new AtomicLong(0);
@ -98,6 +106,12 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.get(settings)
);
this.accountingSettings = new BreakerSettings(CircuitBreaker.ACCOUNTING,
ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(),
ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings),
ACCOUNTING_CIRCUIT_BREAKER_TYPE_SETTING.get(settings)
);
this.parentSettings = new BreakerSettings(CircuitBreaker.PARENT,
TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(), 1.0,
CircuitBreaker.Type.PARENT);
@ -109,6 +123,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
registerBreaker(this.requestSettings);
registerBreaker(this.fielddataSettings);
registerBreaker(this.inFlightRequestsSettings);
registerBreaker(this.accountingSettings);
clusterSettings.addSettingsUpdateConsumer(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, this::setTotalCircuitBreakerLimit, this::validateTotalCircuitBreakerLimit);
clusterSettings.addSettingsUpdateConsumer(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setFieldDataBreakerLimit);

View File

@ -116,6 +116,7 @@ import org.elasticsearch.index.store.DirectoryUtils;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.IndexSettingsModule;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
@ -2546,7 +2547,7 @@ public class InternalEngineTests extends EngineTestCase {
threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, TimeValue.timeValueMinutes(5),
config.getRefreshListeners(), null, config.getTranslogRecoveryRunner());
config.getRefreshListeners(), null, config.getTranslogRecoveryRunner(), new NoneCircuitBreakerService());
try {
InternalEngine internalEngine = new InternalEngine(brokenConfig);
@ -2600,7 +2601,7 @@ public class InternalEngineTests extends EngineTestCase {
threadPool, indexSettings, null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), false, config.getTranslogConfig(), TimeValue.timeValueMinutes(5),
config.getRefreshListeners(), null, config.getTranslogRecoveryRunner());
config.getRefreshListeners(), null, config.getTranslogRecoveryRunner(), new NoneCircuitBreakerService());
engine = new InternalEngine(newConfig);
if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
engine.recoverFromTranslog();
@ -2630,7 +2631,7 @@ public class InternalEngineTests extends EngineTestCase {
threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), true, config.getTranslogConfig(), TimeValue.timeValueMinutes(5),
config.getRefreshListeners(), null, config.getTranslogRecoveryRunner());
config.getRefreshListeners(), null, config.getTranslogRecoveryRunner(), new NoneCircuitBreakerService());
engine = new InternalEngine(newConfig);
if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
engine.recoverFromTranslog();

View File

@ -23,8 +23,11 @@ import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
@ -40,6 +43,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
@ -55,16 +59,21 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerStats;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.junit.annotations.TestLogging;
import java.io.IOException;
import java.io.UncheckedIOException;
@ -94,8 +103,10 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.containsString;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
public class IndexShardIT extends ESSingleNodeTestCase {
@ -495,7 +506,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
}
}
};
final IndexShard newShard = newIndexShard(indexService, shard, wrapper, listener);
final IndexShard newShard = newIndexShard(indexService, shard, wrapper, getInstanceFromNode(CircuitBreakerService.class), listener);
shardRef.set(newShard);
recoverShard(newShard);
@ -506,6 +517,65 @@ public class IndexShardIT extends ESSingleNodeTestCase {
}
}
/** Check that the accounting breaker correctly matches the segments API for memory usage */
private void checkAccountingBreaker() {
CircuitBreakerService breakerService = getInstanceFromNode(CircuitBreakerService.class);
CircuitBreaker acctBreaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING);
long usedMem = acctBreaker.getUsed();
assertThat(usedMem, greaterThan(0L));
NodesStatsResponse response = client().admin().cluster().prepareNodesStats().setIndices(true).setBreaker(true).get();
NodeStats stats = response.getNodes().get(0);
assertNotNull(stats);
SegmentsStats segmentsStats = stats.getIndices().getSegments();
CircuitBreakerStats breakerStats = stats.getBreaker().getStats(CircuitBreaker.ACCOUNTING);
assertEquals(usedMem, segmentsStats.getMemoryInBytes());
assertEquals(usedMem, breakerStats.getEstimated());
}
public void testCircuitBreakerIncrementedByIndexShard() throws Exception {
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("network.breaker.inflight_requests.overhead", 0.0)).get();
// Generate a couple of segments
client().prepareIndex("test", "doc", "1").setSource("{\"foo\":\"" + randomAlphaOfLength(100) + "\"}", XContentType.JSON)
.setRefreshPolicy(IMMEDIATE).get();
// Use routing so 2 documents are guarenteed to be on the same shard
String routing = randomAlphaOfLength(5);
client().prepareIndex("test", "doc", "2").setSource("{\"foo\":\"" + randomAlphaOfLength(100) + "\"}", XContentType.JSON)
.setRefreshPolicy(IMMEDIATE).setRouting(routing).get();
client().prepareIndex("test", "doc", "3").setSource("{\"foo\":\"" + randomAlphaOfLength(100) + "\"}", XContentType.JSON)
.setRefreshPolicy(IMMEDIATE).setRouting(routing).get();
checkAccountingBreaker();
// Test that force merging causes the breaker to be correctly adjusted
logger.info("--> force merging to a single segment");
client().admin().indices().prepareForceMerge("test").setMaxNumSegments(1).setFlush(randomBoolean()).get();
client().admin().indices().prepareRefresh().get();
checkAccountingBreaker();
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("indices.breaker.total.limit", "1kb")).get();
// Test that we're now above the parent limit due to the segments
Exception e = expectThrows(Exception.class,
() -> client().prepareSearch("test").addAggregation(AggregationBuilders.terms("foo_terms").field("foo.keyword")).get());
logger.info("--> got: {}", ExceptionsHelper.detailedMessage(e));
assertThat(ExceptionsHelper.detailedMessage(e), containsString("[parent] Data too large, data for [<agg [foo_terms]>]"));
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.putNull("indices.breaker.total.limit")
.putNull("network.breaker.inflight_requests.overhead")).get();
// Test that deleting the index causes the breaker to correctly be decremented
logger.info("--> deleting index");
client().admin().indices().prepareDelete("test").get();
// Accounting breaker should now be 0
CircuitBreakerService breakerService = getInstanceFromNode(CircuitBreakerService.class);
CircuitBreaker acctBreaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING);
assertThat(acctBreaker.getUsed(), equalTo(0L));
}
public static final IndexShard recoverShard(IndexShard newShard) throws IOException {
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
@ -516,12 +586,12 @@ public class IndexShardIT extends ESSingleNodeTestCase {
}
public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper,
IndexingOperationListener... listeners) throws IOException {
CircuitBreakerService cbs, IndexingOperationListener... listeners) throws IOException {
ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry());
IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(),
shard.store(), indexService.getIndexSortSupplier(), indexService.cache(), indexService.mapperService(), indexService.similarityService(),
shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper,
indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners), () -> {});
indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners), () -> {}, cbs);
return newShard;
}

View File

@ -32,6 +32,7 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
@ -55,6 +56,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -76,6 +78,8 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
@ -155,6 +159,7 @@ import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ -2732,4 +2737,148 @@ public class IndexShardTests extends IndexShardTestCase {
latch1.await();
closeShards(primary);
}
public void testSegmentMemoryTrackedInBreaker() throws Exception {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
IndexMetaData metaData = IndexMetaData.builder("test")
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
.settings(settings)
.primaryTerm(0, 1).build();
IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
recoverShardFromStore(primary);
indexDoc(primary, "test", "0", "{\"foo\" : \"foo\"}");
primary.refresh("forced refresh");
SegmentsStats ss = primary.segmentStats(randomBoolean());
CircuitBreaker breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING);
assertThat(ss.getMemoryInBytes(), equalTo(breaker.getUsed()));
final long preRefreshBytes = ss.getMemoryInBytes();
indexDoc(primary, "test", "1", "{\"foo\" : \"bar\"}");
indexDoc(primary, "test", "2", "{\"foo\" : \"baz\"}");
indexDoc(primary, "test", "3", "{\"foo\" : \"eggplant\"}");
ss = primary.segmentStats(randomBoolean());
breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING);
assertThat(preRefreshBytes, equalTo(breaker.getUsed()));
primary.refresh("refresh");
ss = primary.segmentStats(randomBoolean());
breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING);
assertThat(breaker.getUsed(), equalTo(ss.getMemoryInBytes()));
assertThat(breaker.getUsed(), greaterThan(preRefreshBytes));
indexDoc(primary, "test", "4", "{\"foo\": \"potato\"}");
// Forces a refresh with the INTERNAL scope
((InternalEngine) primary.getEngine()).writeIndexingBuffer();
ss = primary.segmentStats(randomBoolean());
breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING);
assertThat(breaker.getUsed(), equalTo(ss.getMemoryInBytes()));
assertThat(breaker.getUsed(), greaterThan(preRefreshBytes));
final long postRefreshBytes = ss.getMemoryInBytes();
// Deleting a doc causes its memory to be freed from the breaker
deleteDoc(primary, "test", "0");
primary.refresh("force refresh");
ss = primary.segmentStats(randomBoolean());
breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING);
assertThat(breaker.getUsed(), lessThan(postRefreshBytes));
closeShards(primary);
breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING);
assertThat(breaker.getUsed(), equalTo(0L));
}
public void testSegmentMemoryTrackedWithRandomSearchers() throws Exception {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
IndexMetaData metaData = IndexMetaData.builder("test")
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
.settings(settings)
.primaryTerm(0, 1).build();
IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
recoverShardFromStore(primary);
int threadCount = randomIntBetween(2, 6);
List<Thread> threads = new ArrayList<>(threadCount);
int iterations = randomIntBetween(50, 100);
List<Engine.Searcher> searchers = Collections.synchronizedList(new ArrayList<>());
logger.info("--> running with {} threads and {} iterations each", threadCount, iterations);
for (int threadId = 0; threadId < threadCount; threadId++) {
final String threadName = "thread-" + threadId;
Runnable r = () -> {
for (int i = 0; i < iterations; i++) {
try {
if (randomBoolean()) {
String id = "id-" + threadName + "-" + i;
logger.debug("--> {} indexing {}", threadName, id);
indexDoc(primary, "test", id, "{\"foo\" : \"" + randomAlphaOfLength(10) + "\"}");
}
if (randomBoolean() && i > 10) {
String id = "id-" + threadName + "-" + randomIntBetween(0, i - 1);
logger.debug("--> {}, deleting {}", threadName, id);
deleteDoc(primary, "test", id);
}
if (randomBoolean()) {
logger.debug("--> {} refreshing", threadName);
primary.refresh("forced refresh");
}
if (randomBoolean()) {
String searcherName = "searcher-" + threadName + "-" + i;
logger.debug("--> {} acquiring new searcher {}", threadName, searcherName);
// Acquire a new searcher, adding it to the list
searchers.add(primary.acquireSearcher(searcherName));
}
if (randomBoolean() && searchers.size() > 1) {
// Close one of the searchers at random
Engine.Searcher searcher = searchers.remove(0);
logger.debug("--> {} closing searcher {}", threadName, searcher.source());
IOUtils.close(searcher);
}
} catch (Exception e) {
logger.warn("--> got exception: ", e);
fail("got an exception we didn't expect");
}
}
};
threads.add(new Thread(r, threadName));
}
threads.stream().forEach(t -> t.start());
for (Thread t : threads) {
t.join();
}
// Close remaining searchers
IOUtils.close(searchers);
SegmentsStats ss = primary.segmentStats(randomBoolean());
CircuitBreaker breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING);
long segmentMem = ss.getMemoryInBytes();
long breakerMem = breaker.getUsed();
logger.info("--> comparing segmentMem: {} - breaker: {} => {}", segmentMem, breakerMem, segmentMem == breakerMem);
assertThat(segmentMem, equalTo(breakerMem));
// Close shard
closeShards(primary);
// Check that the breaker was successfully reset to 0, meaning that all the accounting was correctly applied
breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING);
assertThat(breaker.getUsed(), equalTo(0L));
}
}

View File

@ -51,6 +51,7 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
@ -122,7 +123,7 @@ public class RefreshListenersTests extends ESTestCase {
EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, allocationId, threadPool,
indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger),
eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig,
TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null, null);
TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null, null, new NoneCircuitBreakerService());
engine = new InternalEngine(config);
listeners.setTranslog(engine.getTranslog());
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardIT;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
@ -440,7 +441,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
shard.writeIndexingBuffer();
}
};
final IndexShard newShard = IndexShardIT.newIndexShard(indexService, shard, wrapper, imc);
final IndexShard newShard = IndexShardIT.newIndexShard(indexService, shard, wrapper, new NoneCircuitBreakerService(), imc);
shardRef.set(newShard);
try {
assertEquals(0, imc.availableShards().size());

View File

@ -72,6 +72,24 @@ memory on a node. The memory usage is based on the content length of the request
A constant that all in flight requests estimations are multiplied with to determine a
final estimation. Defaults to 1
[[accounting-circuit-breaker]]
[float]
==== Accounting requests circuit breaker
The in flight requests circuit breaker allows Elasticsearch to limit the memory
usage of things held in memory that are not released when a request is
completed. This includes things like the Lucene segment memory.
`network.breaker.accounting.limit`::
Limit for accounting breaker, defaults to 100% of JVM heap. This means that it is bound
by the limit configured for the parent circuit breaker.
`network.breaker.accounting.overhead`::
A constant that all accounting estimations are multiplied with to determine a
final estimation. Defaults to 1
[[script-compilation-circuit-breaker]]
[float]
==== Script compilation circuit breaker

View File

@ -68,6 +68,7 @@ import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
@ -162,7 +163,7 @@ public abstract class EngineTestCase extends ESTestCase {
config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(),
config.getIndexSort(), config.getTranslogRecoveryRunner());
config.getIndexSort(), config.getTranslogRecoveryRunner(), config.getCircuitBreakerService());
}
@Override
@ -401,7 +402,7 @@ public abstract class EngineTestCase extends ESTestCase {
EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store,
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig,
TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler);
TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler, new NoneCircuitBreakerService());
return config;
}

View File

@ -41,6 +41,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.util.BigArrays;
@ -64,6 +65,9 @@ import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoverySourceHandler;
@ -289,9 +293,12 @@ public abstract class IndexShardTestCase extends ESTestCase {
};
final Engine.Warmer warmer = searcher -> {
};
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(nodeSettings, clusterSettings);
indexShard = new IndexShard(routing, indexSettings, shardPath, store, () -> null, indexCache, mapperService, similarityService,
engineFactory, indexEventListener, indexSearcherWrapper, threadPool,
BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer);
BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer,
breakerService);
success = true;
} finally {
if (success == false) {

View File

@ -168,6 +168,8 @@ public final class ExternalTestCluster extends TestCluster {
for (NodeStats stats : nodeStats.getNodes()) {
assertThat("Fielddata breaker not reset to 0 on node: " + stats.getNode(),
stats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getEstimated(), equalTo(0L));
assertThat("Accounting breaker not reset to 0 on node: " + stats.getNode(),
stats.getBreaker().getStats(CircuitBreaker.ACCOUNTING).getEstimated(), equalTo(0L));
// ExternalTestCluster does not check the request breaker,
// because checking it requires a network request, which in
// turn increments the breaker, making it non-0

View File

@ -2050,6 +2050,8 @@ public final class InternalTestCluster extends TestCluster {
final CircuitBreakerService breakerService = getInstanceFromNode(CircuitBreakerService.class, nodeAndClient.node);
CircuitBreaker fdBreaker = breakerService.getBreaker(CircuitBreaker.FIELDDATA);
assertThat("Fielddata breaker not reset to 0 on node: " + name, fdBreaker.getUsed(), equalTo(0L));
CircuitBreaker acctBreaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING);
assertThat("Accounting breaker not reset to 0 on node: " + name, acctBreaker.getUsed(), equalTo(0L));
// Anything that uses transport or HTTP can increase the
// request breaker (because they use bigarrays), because of
// that the breaker can sometimes be incremented from ping