Bucket aggregation circuit breaker optimization. (#46751) (#51730)

Co-authored-by: Howard <danielhuang@tencent.com>
This commit is contained in:
Adrien Grand 2020-01-31 11:30:51 +01:00 committed by GitHub
parent dc99acee66
commit 915a931e93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 84 additions and 25 deletions

View File

@ -302,6 +302,10 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
}
}
public long getParentLimit() {
return this.parentSettings.getLimit();
}
/**
* Checks whether the parent breaker has been tripped
*/

View File

@ -534,7 +534,7 @@ public class Node implements Closeable {
final SearchService searchService = newSearchService(clusterService, indicesService,
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
responseCollectorService);
responseCollectorService, circuitBreakerService);
final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
.filterPlugins(PersistentTaskPlugin.class).stream()
@ -1035,9 +1035,10 @@ public class Node implements Closeable {
*/
protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays,
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) {
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService) {
return new SearchService(clusterService, indicesService, threadPool,
scriptService, bigArrays, fetchPhase, responseCollectorService);
scriptService, bigArrays, fetchPhase, responseCollectorService, circuitBreakerService);
}
/**

View File

@ -32,6 +32,7 @@ import org.elasticsearch.action.search.SearchShardTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -59,6 +60,7 @@ import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
import org.elasticsearch.node.ResponseCollectorService;
import org.elasticsearch.script.FieldScript;
@ -197,7 +199,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public SearchService(ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase,
ResponseCollectorService responseCollectorService) {
ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService) {
Settings settings = clusterService.getSettings();
this.threadPool = threadPool;
this.clusterService = clusterService;
@ -207,7 +209,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
this.bigArrays = bigArrays;
this.queryPhase = new QueryPhase();
this.fetchPhase = fetchPhase;
this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings);
this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings,
circuitBreakerService.getBreaker(CircuitBreaker.REQUEST));
TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
@ -41,9 +42,12 @@ public class MultiBucketConsumerService {
public static final Setting<Integer> MAX_BUCKET_SETTING =
Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic);
private final CircuitBreaker breaker;
private volatile int maxBucket;
public MultiBucketConsumerService(ClusterService clusterService, Settings settings) {
public MultiBucketConsumerService(ClusterService clusterService, Settings settings, CircuitBreaker breaker) {
this.breaker = breaker;
this.maxBucket = MAX_BUCKET_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BUCKET_SETTING, this::setMaxBucket);
}
@ -94,11 +98,14 @@ public class MultiBucketConsumerService {
*/
public static class MultiBucketConsumer implements IntConsumer {
private final int limit;
private final CircuitBreaker breaker;
// aggregations execute in a single thread so no atomic here
private int count;
public MultiBucketConsumer(int limit) {
public MultiBucketConsumer(int limit, CircuitBreaker breaker) {
this.limit = limit;
this.breaker = breaker;
}
@Override
@ -109,6 +116,11 @@ public class MultiBucketConsumerService {
+ "] but was [" + count + "]. This limit can be set by changing the [" +
MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit);
}
// check parent circuit breaker every 1024 buckets
if (value > 0 && (count & 0x3FF) == 0) {
breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
}
}
public void reset() {
@ -125,6 +137,6 @@ public class MultiBucketConsumerService {
}
public MultiBucketConsumer create() {
return new MultiBucketConsumer(maxBucket);
return new MultiBucketConsumer(maxBucket, breaker);
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.test.ESTestCase;
import java.util.concurrent.atomic.AtomicBoolean;
@ -293,6 +294,32 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
}
}
public void testAllocationBucketsBreaker() throws Exception {
Settings clusterSettings = Settings.builder()
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "100b")
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), "false")
.build();
try (CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings,
new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
long parentLimitBytes = ((HierarchyCircuitBreakerService) service).getParentLimit();
assertEquals(new ByteSizeValue(100, ByteSizeUnit.BYTES).getBytes(), parentLimitBytes);
CircuitBreaker breaker = service.getBreaker(CircuitBreaker.REQUEST);
MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer =
new MultiBucketConsumerService.MultiBucketConsumer(10000, breaker);
// make sure used bytes is greater than the total circuit breaker limit
breaker.addWithoutBreaking(200);
CircuitBreakingException exception =
expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(1024));
assertThat(exception.getMessage(), containsString("[parent] Data too large, data for [allocated_buckets] would be"));
assertThat(exception.getMessage(), containsString("which is larger than the limit of [100/100b]"));
}
}
private long mb(long size) {
return new ByteSizeValue(size, ByteSizeUnit.MB).getBytes();
}

View File

@ -1362,7 +1362,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
final SearchTransportService searchTransportService = new SearchTransportService(transportService,
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
final SearchService searchService = new SearchService(clusterService, indicesService, threadPool, scriptService,
bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService);
bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService, new NoneCircuitBreakerService());
actions.put(SearchAction.INSTANCE,
new TransportSearchAction(threadPool, transportService, searchService,
searchTransportService, new SearchPhaseController(searchService::createReduceContext), clusterService,

View File

@ -123,12 +123,14 @@ public class MockNode extends Node {
@Override
protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays,
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) {
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService) {
if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) {
return super.newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase,
responseCollectorService);
responseCollectorService, circuitBreakerService);
}
return new MockSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase);
return new MockSearchService(clusterService, indicesService, threadPool, scriptService,
bigArrays, fetchPhase, circuitBreakerService);
}
@Override

View File

@ -22,6 +22,7 @@ package org.elasticsearch.search;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
@ -68,8 +69,8 @@ public class MockSearchService extends SearchService {
public MockSearchService(ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService,
BigArrays bigArrays, FetchPhase fetchPhase) {
super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null);
BigArrays bigArrays, FetchPhase fetchPhase, CircuitBreakerService circuitBreakerService) {
super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null, circuitBreakerService);
}
@Override

View File

@ -34,6 +34,7 @@ import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
@ -130,7 +131,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
IndexSearcher indexSearcher,
MappedFieldType... fieldTypes) throws IOException {
return createAggregator(aggregationBuilder, indexSearcher, createIndexSettings(),
new MultiBucketConsumer(DEFAULT_MAX_BUCKETS), fieldTypes);
new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)), fieldTypes);
}
protected <A extends Aggregator> A createAggregator(Query query,
@ -139,7 +140,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
IndexSettings indexSettings,
MappedFieldType... fieldTypes) throws IOException {
return createAggregator(query, aggregationBuilder, indexSearcher, indexSettings,
new MultiBucketConsumer(DEFAULT_MAX_BUCKETS), fieldTypes);
new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)), fieldTypes);
}
protected <A extends Aggregator> A createAggregator(Query query, AggregationBuilder aggregationBuilder,
@ -329,7 +330,8 @@ public abstract class AggregatorTestCase extends ESTestCase {
AggregationBuilder builder,
int maxBucket,
MappedFieldType... fieldTypes) throws IOException {
MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket);
MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST));
C a = createAggregator(query, builder, searcher, indexSettings, bucketConsumer, fieldTypes);
a.preCollection();
searcher.search(query, a);
@ -393,11 +395,13 @@ public abstract class AggregatorTestCase extends ESTestCase {
List<InternalAggregation> aggs = new ArrayList<> ();
Query rewritten = searcher.rewrite(query);
Weight weight = searcher.createWeight(rewritten, ScoreMode.COMPLETE, 1f);
MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket);
MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST));
C root = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes);
for (ShardSearcher subSearcher : subSearchers) {
MultiBucketConsumer shardBucketConsumer = new MultiBucketConsumer(maxBucket);
MultiBucketConsumer shardBucketConsumer = new MultiBucketConsumer(maxBucket,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST));
C a = createAggregator(query, builder, subSearcher, indexSettings, shardBucketConsumer, fieldTypes);
a.preCollection();
subSearcher.search(weight, a);
@ -415,7 +419,8 @@ public abstract class AggregatorTestCase extends ESTestCase {
Collections.shuffle(aggs, random());
int r = randomIntBetween(1, toReduceSize);
List<InternalAggregation> toReduce = aggs.subList(0, r);
MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket);
MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST));
InternalAggregation.ReduceContext context =
new InternalAggregation.ReduceContext(root.context().bigArrays(), getMockScriptService(),
reduceBucketConsumer, false);
@ -425,7 +430,8 @@ public abstract class AggregatorTestCase extends ESTestCase {
aggs.add(reduced);
}
// now do the final reduce
MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket);
MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST));
InternalAggregation.ReduceContext context =
new InternalAggregation.ReduceContext(root.context().bigArrays(), getMockScriptService(), reduceBucketConsumer, true);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.test;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
@ -269,7 +270,8 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
Collections.shuffle(toReduce, random());
int r = randomIntBetween(1, toReduceSize);
List<InternalAggregation> internalAggregations = toReduce.subList(0, r);
MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS);
MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST));
InternalAggregation.ReduceContext context =
new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer,false);
@SuppressWarnings("unchecked")
@ -285,7 +287,8 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
toReduce = new ArrayList<>(toReduce.subList(r, toReduceSize));
toReduce.add(reduced);
}
MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS);
MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST));
InternalAggregation.ReduceContext context =
new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer, true);
@SuppressWarnings("unchecked")