Add HierarchyCircuitBreakerService

Adds a breaker for request BigArrays, which are used for parent/child
queries as well as some aggregations. Certain operations like Netty HTTP
responses and transport responses increment the breaker, but will not
trip.

This also changes the output of the nodes' stats endpoint to show the
parent breaker as well as the fielddata and request breakers.

There are a number of new settings for breakers now:

`indices.breaker.total.limit`: starting limit for all memory-use breaker,
defaults to 70%

`indices.breaker.fielddata.limit`: starting limit for fielddata breaker,
defaults to 60%
`indices.breaker.fielddata.overhead`: overhead for fielddata breaker
estimations, defaults to 1.03

(the fielddata breaker settings also use the backwards-compatible
setting `indices.fielddata.breaker.limit` and
`indices.fielddata.breaker.overhead`)

`indices.breaker.request.limit`: starting limit for request breaker,
defaults to 40%
`indices.breaker.request.overhead`: request breaker estimation overhead,
defaults to 1.0

The breaker service infrastructure is now generic and opens the path to
adding additional circuit breakers in the future.

Fixes #6129

Conflicts:
	src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java
	src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java
	src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java
	src/main/java/org/elasticsearch/index/fielddata/ordinals/GlobalOrdinalsBuilder.java
	src/main/java/org/elasticsearch/index/fielddata/ordinals/InternalGlobalOrdinalsBuilder.java
	src/main/java/org/elasticsearch/index/fielddata/plain/AbstractIndexOrdinalsFieldData.java
	src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java
	src/main/java/org/elasticsearch/index/fielddata/plain/IndexIndexFieldData.java
	src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java
	src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java
	src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java
	src/main/java/org/elasticsearch/index/fielddata/plain/SortedSetDVOrdinalsIndexFieldData.java
	src/main/java/org/elasticsearch/node/internal/InternalNode.java
	src/test/java/org/elasticsearch/index/aliases/IndexAliasesServiceTests.java
	src/test/java/org/elasticsearch/index/codec/CodecTests.java
	src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTests.java
	src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java
	src/test/java/org/elasticsearch/index/mapper/MapperTestUtils.java
	src/test/java/org/elasticsearch/index/query/IndexQueryParserFilterCachingTests.java
	src/test/java/org/elasticsearch/index/query/SimpleIndexQueryParserTests.java
	src/test/java/org/elasticsearch/index/query/guice/IndexQueryParserModuleTests.java
	src/test/java/org/elasticsearch/index/search/FieldDataTermsFilterTests.java
	src/test/java/org/elasticsearch/index/search/child/ChildrenConstantScoreQueryTests.java
	src/test/java/org/elasticsearch/index/similarity/SimilarityTests.java
This commit is contained in:
Lee Hinman 2014-05-21 13:37:03 +02:00
parent be86556946
commit 6abe4c951d
59 changed files with 1640 additions and 415 deletions

View File

@ -24,28 +24,63 @@ field data after a certain time of inactivity. Defaults to `-1`. For
example, can be set to `5m` for a 5 minute expiry.
|=======================================================================
[float]
[[circuit-breaker]]
=== Circuit Breaker
coming[1.4.0,Prior to 1.4.0 there was only a single circuit breaker for fielddata]
Elasticsearch contains multiple circuit breakers used to prevent operations from
causing an OutOfMemoryError. Each breaker specifies a limit for how much memory
it can use. Additionally, there is a parent-level breaker that specifies the
total amount of memory that can be used across all breakers.
The parent-level breaker can be configured with the following setting:
`indices.breaker.total.limit`::
Starting limit for overall parent breaker, defaults to 70% of JVM heap
All circuit breaker settings can be changed dynamically using the cluster update
settings API.
[float]
[[fielddata-circuit-breaker]]
=== Field data circuit breaker
==== Field data circuit breaker
The field data circuit breaker allows Elasticsearch to estimate the amount of
memory a field will required to be loaded into memory. It can then prevent the
field data loading by raising an exception. By default the limit is configured
to 60% of the maximum JVM heap. It can be configured with the following
parameters:
[cols="<,<",options="header",]
|=======================================================================
|Setting |Description
|`indices.fielddata.breaker.limit` |Maximum size of estimated field data
to allow loading. Defaults to 60% of the maximum JVM heap.
|`indices.fielddata.breaker.overhead` |A constant that all field data
estimations are multiplied with to determine a final estimation. Defaults to
1.03
|=======================================================================
`indices.breaker.fielddata.limit`::
Limit for fielddata breaker, defaults to 60% of JVM heap
Both the `indices.fielddata.breaker.limit` and
`indices.fielddata.breaker.overhead` can be changed dynamically using the
cluster update settings API.
`indices.breaker.fielddata.overhead`::
A constant that all field data estimations are multiplied with to determine a
final estimation. Defaults to 1.03
`indices.fielddata.breaker.limit`::
deprecated[1.4.0,Replaced by `indices.breaker.fielddata.limit`]
`indices.fielddata.breaker.overhead`::
deprecated[1.4.0,Replaced by `indices.breaker.fielddata.overhead`]
[float]
[[request-circuit-breaker]]
==== Request circuit breaker
coming[1.4.0]
The request circuit breaker allows Elasticsearch to prevent per-request data
structures (for example, memory used for calculating aggregations during a
request) from exceeding a certain amount of memory.
`indices.breaker.request.limit`::
Limit for request breaker, defaults to 40% of JVM heap
`indices.breaker.request.overhead`::
A constant that all request estimations are multiplied with to determine a
final estimation. Defaults to 1
[float]
[[fielddata-monitoring]]

View File

@ -28,7 +28,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.indices.NodeIndicesStats;
import org.elasticsearch.indices.fielddata.breaker.FieldDataBreakerStats;
import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
import org.elasticsearch.monitor.fs.FsStats;
import org.elasticsearch.monitor.jvm.JvmStats;
import org.elasticsearch.monitor.network.NetworkStats;
@ -75,7 +75,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent {
private HttpStats http;
@Nullable
private FieldDataBreakerStats breaker;
private AllCircuitBreakerStats breaker;
NodeStats() {
}
@ -83,7 +83,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent {
public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats indices,
@Nullable OsStats os, @Nullable ProcessStats process, @Nullable JvmStats jvm, @Nullable ThreadPoolStats threadPool,
@Nullable NetworkStats network, @Nullable FsStats fs, @Nullable TransportStats transport, @Nullable HttpStats http,
@Nullable FieldDataBreakerStats breaker) {
@Nullable AllCircuitBreakerStats breaker) {
super(node);
this.timestamp = timestamp;
this.indices = indices;
@ -174,7 +174,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent {
}
@Nullable
public FieldDataBreakerStats getBreaker() {
public AllCircuitBreakerStats getBreaker() {
return this.breaker;
}
@ -215,7 +215,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent {
if (in.readBoolean()) {
http = HttpStats.readHttpStats(in);
}
breaker = FieldDataBreakerStats.readOptionalCircuitBreakerStats(in);
breaker = AllCircuitBreakerStats.readOptionalAllCircuitBreakerStats(in);
}
@Override

View File

@ -76,6 +76,7 @@ import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.EnvironmentModule;
import org.elasticsearch.indices.breaker.CircuitBreakerModule;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.PluginsModule;
@ -187,6 +188,7 @@ public class TransportClient extends AbstractClient {
modules.add(new TransportModule(this.settings));
modules.add(new ActionModule(true));
modules.add(new ClientTransportModule());
modules.add(new CircuitBreakerModule(this.settings));
injector = modules.createInjector();

View File

@ -28,8 +28,9 @@ import org.elasticsearch.cluster.routing.allocation.decider.*;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService;
import org.elasticsearch.indices.breaker.InternalCircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService;
@ -85,6 +86,11 @@ public class ClusterDynamicSettingsModule extends AbstractModule {
clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
clusterDynamicSettings.addDynamicSetting(DestructiveOperations.REQUIRES_NAME);
clusterDynamicSettings.addDynamicSetting(DiscoverySettings.PUBLISH_TIMEOUT, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
}
public void addDynamicSettings(String... settings) {

View File

@ -0,0 +1,224 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.breaker;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.indices.breaker.BreakerSettings;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import java.util.concurrent.atomic.AtomicLong;
/**
* Breaker that will check a parent's when incrementing
*/
public class ChildMemoryCircuitBreaker implements CircuitBreaker {
private final long memoryBytesLimit;
private final BreakerSettings settings;
private final double overheadConstant;
private final AtomicLong used;
private final AtomicLong trippedCount;
private final ESLogger logger;
private final HierarchyCircuitBreakerService parent;
private final Name name;
/**
* Create a circuit breaker that will break if the number of estimated
* bytes grows above the limit. All estimations will be multiplied by
* the given overheadConstant. This breaker starts with 0 bytes used.
* @param settings settings to configure this breaker
* @param parent parent circuit breaker service to delegate tripped breakers to
* @param name the name of the breaker
*/
public ChildMemoryCircuitBreaker(BreakerSettings settings, ESLogger logger,
HierarchyCircuitBreakerService parent, Name name) {
this(settings, null, logger, parent, name);
}
/**
* Create a circuit breaker that will break if the number of estimated
* bytes grows above the limit. All estimations will be multiplied by
* the given overheadConstant. Uses the given oldBreaker to initialize
* the starting offset.
* @param settings settings to configure this breaker
* @param parent parent circuit breaker service to delegate tripped breakers to
* @param name the name of the breaker
* @param oldBreaker the previous circuit breaker to inherit the used value from (starting offset)
*/
public ChildMemoryCircuitBreaker(BreakerSettings settings, ChildMemoryCircuitBreaker oldBreaker,
ESLogger logger, HierarchyCircuitBreakerService parent, Name name) {
this.name = name;
this.settings = settings;
this.memoryBytesLimit = settings.getLimit();
this.overheadConstant = settings.getOverhead();
if (oldBreaker == null) {
this.used = new AtomicLong(0);
this.trippedCount = new AtomicLong(0);
} else {
this.used = oldBreaker.used;
this.trippedCount = oldBreaker.trippedCount;
}
this.logger = logger;
if (logger.isTraceEnabled()) {
logger.trace("creating ChildCircuitBreaker with settings {}", this.settings);
}
this.parent = parent;
}
/**
* Method used to trip the breaker, delegates to the parent to determine
* whether to trip the breaker or not
*/
@Override
public void circuitBreak(String fieldName, long bytesNeeded) {
this.trippedCount.incrementAndGet();
throw new CircuitBreakingException("[" + this.name + "] Data too large, data for [" +
fieldName + "] would be larger than limit of [" +
memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]",
bytesNeeded, this.memoryBytesLimit);
}
/**
* Add a number of bytes, tripping the circuit breaker if the aggregated
* estimates are above the limit. Automatically trips the breaker if the
* memory limit is set to 0. Will never trip the breaker if the limit is
* set < 0, but can still be used to aggregate estimations.
* @param bytes number of bytes to add to the breaker
* @return number of "used" bytes so far
* @throws CircuitBreakingException
*/
@Override
public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
// short-circuit on no data allowed, immediately throwing an exception
if (memoryBytesLimit == 0) {
circuitBreak(label, bytes);
}
long newUsed;
// If there is no limit (-1), we can optimize a bit by using
// .addAndGet() instead of looping (because we don't have to check a
// limit), which makes the RamAccountingTermsEnum case faster.
if (this.memoryBytesLimit == -1) {
newUsed = this.used.addAndGet(bytes);
if (logger.isTraceEnabled()) {
logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]",
this.name, new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed));
}
} else {
// Otherwise, check the addition and commit the addition, looping if
// there are conflicts. May result in additional logging, but it's
// trace logging and shouldn't be counted on for additions.
long currentUsed;
do {
currentUsed = this.used.get();
newUsed = currentUsed + bytes;
long newUsedWithOverhead = (long) (newUsed * overheadConstant);
if (logger.isTraceEnabled()) {
logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]",
this.name,
new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed),
memoryBytesLimit, new ByteSizeValue(memoryBytesLimit),
newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead));
}
if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) {
logger.error("[{}] New used memory {} [{}] from field [{}] would be larger than configured breaker: {} [{}], breaking",
this.name,
newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), label,
memoryBytesLimit, new ByteSizeValue(memoryBytesLimit));
circuitBreak(label, newUsedWithOverhead);
}
// Attempt to set the new used value, but make sure it hasn't changed
// underneath us, if it has, keep trying until we are able to set it
} while (!this.used.compareAndSet(currentUsed, newUsed));
}
// Additionally, we need to check that we haven't exceeded the parent's limit
try {
parent.checkParentLimit(label);
} catch (CircuitBreakingException e) {
// If the parent breaker is tripped, this breaker has to be
// adjusted back down because the allocation is "blocked" but the
// breaker has already been incremented
this.used.addAndGet(-bytes);
throw e;
}
return newUsed;
}
/**
* Add an <b>exact</b> number of bytes, not checking for tripping the
* circuit breaker. This bypasses the overheadConstant multiplication.
*
* Also does not check with the parent breaker to see if the parent limit
* has been exceeded.
*
* @param bytes number of bytes to add to the breaker
* @return number of "used" bytes so far
*/
@Override
public long addWithoutBreaking(long bytes) {
long u = used.addAndGet(bytes);
if (logger.isTraceEnabled()) {
logger.trace("[{}] Adjusted breaker by [{}] bytes, now [{}]", this.name, bytes, u);
}
assert u >= 0 : "Used bytes: [" + u + "] must be >= 0";
return u;
}
/**
* @return the number of aggregated "used" bytes so far
*/
@Override
public long getUsed() {
return this.used.get();
}
/**
* @return the number of bytes that can be added before the breaker trips
*/
@Override
public long getLimit() {
return this.memoryBytesLimit;
}
/**
* @return the constant multiplier the breaker uses for aggregations
*/
@Override
public double getOverhead() {
return this.overheadConstant;
}
/**
* @return the number of times the breaker has been tripped
*/
@Override
public long getTrippedCount() {
return this.trippedCount.get();
}
/**
* @return the name of the breaker
*/
public Name getName() {
return this.name;
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.breaker;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* Interface for an object that can be incremented, breaking after some
* configured limit has been reached.
*/
public interface CircuitBreaker {
/**
* Enum used for specifying different types of circuit breakers
*/
public static enum Name {
PARENT(0),
FIELDDATA(1),
REQUEST(2);
private int ordinal;
Name(int ordinal) {
this.ordinal = ordinal;
}
public int getSerializableValue() {
return this.ordinal;
}
public static Name readFrom(StreamInput in) throws IOException {
int value = in.readVInt();
switch (value) {
case 0:
return Name.PARENT;
case 1:
return Name.FIELDDATA;
case 2:
return Name.REQUEST;
default:
throw new ElasticsearchIllegalArgumentException("No CircuitBreaker with ordinal: " + value);
}
}
public static void writeTo(Name name, StreamOutput out) throws IOException {
out.writeVInt(name.getSerializableValue());
}
}
/**
* Trip the circuit breaker
* @param fieldName name of the field responsible for tripping the breaker
* @param bytesNeeded bytes asked for but unable to be allocated
*/
public void circuitBreak(String fieldName, long bytesNeeded);
/**
* add bytes to the breaker and maybe trip
* @param bytes number of bytes to add
* @param label string label describing the bytes being added
* @return the number of "used" bytes for the circuit breaker
* @throws CircuitBreakingException
*/
public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException;
/**
* Adjust the circuit breaker without tripping
*/
public long addWithoutBreaking(long bytes);
/**
* @return the currently used bytes the breaker is tracking
*/
public long getUsed();
/**
* @return maximum number of bytes the circuit breaker can track before tripping
*/
public long getLimit();
/**
* @return overhead of circuit breaker
*/
public double getOverhead();
/**
* @return the number of times the circuit breaker has been tripped
*/
public long getTrippedCount();
/**
* @return the name of the breaker
*/
public Name getName();
}

View File

@ -25,8 +25,26 @@ import org.elasticsearch.ElasticsearchException;
*/
public class CircuitBreakingException extends ElasticsearchException {
// TODO: maybe add more neat metrics here?
private final long bytesWanted;
private final long byteLimit;
public CircuitBreakingException(String message) {
super(message);
this.bytesWanted = 0;
this.byteLimit = 0;
}
public CircuitBreakingException(String message, long bytesWanted, long byteLimit) {
super(message);
this.bytesWanted = bytesWanted;
this.byteLimit = byteLimit;
}
public long getBytesWanted() {
return this.bytesWanted;
}
public long getByteLimit() {
return this.byteLimit;
}
}

View File

@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
* MemoryCircuitBreaker is a circuit breaker that breaks once a
* configurable memory limit has been reached.
*/
public class MemoryCircuitBreaker {
public class MemoryCircuitBreaker implements CircuitBreaker {
private final long memoryBytesLimit;
private final double overheadConstant;
@ -77,7 +77,7 @@ public class MemoryCircuitBreaker {
* Method used to trip the breaker
* @throws CircuitBreakingException
*/
public void circuitBreak(String fieldName) throws CircuitBreakingException {
public void circuitBreak(String fieldName, long bytesNeeded) throws CircuitBreakingException {
this.trippedCount.incrementAndGet();
throw new CircuitBreakingException("Data too large, data for field [" + fieldName + "] would be larger than limit of [" +
memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]");
@ -92,10 +92,10 @@ public class MemoryCircuitBreaker {
* @return number of "used" bytes so far
* @throws CircuitBreakingException
*/
public double addEstimateBytesAndMaybeBreak(long bytes, String fieldName) throws CircuitBreakingException {
public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
// short-circuit on no data allowed, immediately throwing an exception
if (memoryBytesLimit == 0) {
circuitBreak(fieldName);
circuitBreak(label, bytes);
}
long newUsed;
@ -106,7 +106,7 @@ public class MemoryCircuitBreaker {
newUsed = this.used.addAndGet(bytes);
if (logger.isTraceEnabled()) {
logger.trace("Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]",
new ByteSizeValue(bytes), fieldName, new ByteSizeValue(newUsed));
new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed));
}
return newUsed;
}
@ -121,15 +121,15 @@ public class MemoryCircuitBreaker {
long newUsedWithOverhead = (long)(newUsed * overheadConstant);
if (logger.isTraceEnabled()) {
logger.trace("Adding [{}][{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]",
new ByteSizeValue(bytes), fieldName, new ByteSizeValue(newUsed),
new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed),
memoryBytesLimit, new ByteSizeValue(memoryBytesLimit),
newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead));
}
if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) {
logger.error("New used memory {} [{}] from field [{}] would be larger than configured breaker: {} [{}], breaking",
newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), fieldName,
newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), label,
memoryBytesLimit, new ByteSizeValue(memoryBytesLimit));
circuitBreak(fieldName);
circuitBreak(label, newUsedWithOverhead);
}
// Attempt to set the new used value, but make sure it hasn't changed
// underneath us, if it has, keep trying until we are able to set it
@ -161,9 +161,9 @@ public class MemoryCircuitBreaker {
}
/**
* @return the maximum number of bytes before the circuit breaker will trip
* @return the number of bytes that can be added before the breaker trips
*/
public long getMaximum() {
public long getLimit() {
return this.memoryBytesLimit;
}
@ -180,4 +180,11 @@ public class MemoryCircuitBreaker {
public long getTrippedCount() {
return this.trippedCount.get();
}
/**
* @return the name of the breaker
*/
public Name getName() {
return Name.FIELDDATA;
}
}

View File

@ -32,7 +32,7 @@ public enum MemorySizeValue {
* <tt>42</tt> (default assumed unit is byte) or <tt>2mb</tt>, or percentages of the heap size: if
* the heap is 1G, <tt>10%</tt> will be parsed as <tt>100mb</tt>. */
public static ByteSizeValue parseBytesSizeValueOrHeapRatio(String sValue) {
if (sValue.endsWith("%")) {
if (sValue != null && sValue.endsWith("%")) {
final String percentAsString = sValue.substring(0, sValue.length() - 1);
try {
final double percent = Double.parseDouble(percentAsString);

View File

@ -33,7 +33,7 @@ abstract class AbstractArray implements BigArray {
@Override
public final void close() {
bigArrays.ramBytesUsed.addAndGet(-ramBytesUsed());
bigArrays.adjustBreaker(-ramBytesUsed());
assert !released : "double release";
released = true;
doClose();

View File

@ -23,8 +23,10 @@ import com.google.common.base.Preconditions;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
@ -32,6 +34,7 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
@ -39,9 +42,8 @@ import java.util.concurrent.atomic.AtomicLong;
/** Utility class to work with arrays. */
public class BigArrays extends AbstractComponent {
// TODO: switch to a circuit breaker that is shared not only on big arrays level, and applies to other request level data structures
public static final String MAX_SIZE_IN_BYTES_SETTING = "requests.memory.breaker.limit";
public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(ImmutableSettings.EMPTY, null, Long.MAX_VALUE);
public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(ImmutableSettings.EMPTY, null, null);
/** Page size in bytes: 16KB */
public static final int PAGE_SIZE_IN_BYTES = 1 << 14;
@ -363,39 +365,73 @@ public class BigArrays extends AbstractComponent {
}
final PageCacheRecycler recycler;
final AtomicLong ramBytesUsed;
final long maxSizeInBytes;
final CircuitBreakerService breakerService;
final boolean checkBreaker;
@Inject
public BigArrays(Settings settings, PageCacheRecycler recycler) {
this(settings, recycler, settings.getAsMemory(MAX_SIZE_IN_BYTES_SETTING, Long.toString(Long.MAX_VALUE)).bytes());
public BigArrays(Settings settings, PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService) {
// Checking the breaker is disabled if not specified
this(settings, recycler, breakerService, false);
}
private BigArrays(Settings settings, PageCacheRecycler recycler, final long maxSizeInBytes) {
public BigArrays(Settings settings, PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, boolean checkBreaker) {
super(settings);
this.maxSizeInBytes = maxSizeInBytes;
this.checkBreaker = checkBreaker;
this.recycler = recycler;
ramBytesUsed = new AtomicLong();
this.breakerService = breakerService;
}
private void validate(long delta) {
final long totalSizeInBytes = ramBytesUsed.addAndGet(delta);
if (totalSizeInBytes > maxSizeInBytes) {
throw new ElasticsearchIllegalStateException("Maximum number of bytes allocated exceeded: [" + totalSizeInBytes + "] (> " + maxSizeInBytes + ")");
/**
* Adjust the circuit breaker with the given delta, if the delta is
* negative, or checkBreaker is false, the breaker will be adjusted
* without tripping
*/
void adjustBreaker(long delta) {
if (this.breakerService != null) {
CircuitBreaker breaker = this.breakerService.getBreaker(CircuitBreaker.Name.REQUEST);
if (this.checkBreaker == true) {
// checking breaker means potentially tripping, but it doesn't
// have to if the delta is negative
if (delta > 0) {
try {
breaker.addEstimateBytesAndMaybeBreak(delta, "<reused_arrays>");
} catch (CircuitBreakingException e) {
// since we've already created the data, we need to
// add it so closing the stream re-adjusts properly
breaker.addWithoutBreaking(delta);
// re-throw the original exception
throw e;
}
} else {
breaker.addWithoutBreaking(delta);
}
} else {
// even if we are not checking the breaker, we need to adjust
// its' totals, so add without breaking
breaker.addWithoutBreaking(delta);
}
}
}
/**
* Return a new instance of this BigArrays class with circuit breaking
* explicitly enabled, instead of only accounting enabled
*/
public BigArrays withCircuitBreaking() {
return new BigArrays(this.settings, this.recycler, this.breakerService, true);
}
private <T extends AbstractBigArray> T resizeInPlace(T array, long newSize) {
final long oldMemSize = array.ramBytesUsed();
array.resize(newSize);
validate(array.ramBytesUsed() - oldMemSize);
adjustBreaker(array.ramBytesUsed() - oldMemSize);
return array;
}
private <T extends BigArray> T validate(T array) {
boolean success = false;
try {
validate(array.ramBytesUsed());
adjustBreaker(array.ramBytesUsed());
success = true;
} finally {
if (!success) {
@ -720,11 +756,4 @@ public class BigArrays extends AbstractComponent {
final long newSize = overSize(minSize, OBJECT_PAGE_SIZE, RamUsageEstimator.NUM_BYTES_OBJECT_REF);
return resize(array, newSize);
}
/**
* Return an approximate number of bytes that have been allocated but not released yet.
*/
public long sizeInBytes() {
return ramBytesUsed.get();
}
}

View File

@ -33,7 +33,7 @@ import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource.N
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
import java.io.IOException;

View File

@ -38,11 +38,11 @@ import org.elasticsearch.index.mapper.internal.IndexFieldMapper;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import java.util.ArrayList;
import java.util.Collection;

View File

@ -21,8 +21,8 @@ package org.elasticsearch.index.fielddata;
import org.apache.lucene.index.FilteredTermsEnum;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.index.fielddata.plain.AbstractIndexFieldData;
import org.elasticsearch.common.breaker.CircuitBreaker;
import java.io.IOException;
@ -36,7 +36,7 @@ public final class RamAccountingTermsEnum extends FilteredTermsEnum {
// Flush every 5mb
private static final long FLUSH_BUFFER_SIZE = 1024 * 1024 * 5;
private final MemoryCircuitBreaker breaker;
private final CircuitBreaker breaker;
private final TermsEnum termsEnum;
private final AbstractIndexFieldData.PerValueEstimator estimator;
private final String fieldName;
@ -44,7 +44,7 @@ public final class RamAccountingTermsEnum extends FilteredTermsEnum {
private long flushBuffer;
public RamAccountingTermsEnum(TermsEnum termsEnum, MemoryCircuitBreaker breaker, AbstractIndexFieldData.PerValueEstimator estimator,
public RamAccountingTermsEnum(TermsEnum termsEnum, CircuitBreaker breaker, AbstractIndexFieldData.PerValueEstimator estimator,
String fieldName) {
super(termsEnum);
this.breaker = breaker;

View File

@ -23,11 +23,12 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomAccessOrds;
import org.apache.lucene.index.XOrdinalMap;
import org.apache.lucene.util.packed.PackedInts;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.fielddata.AtomicOrdinalsFieldData;
import org.elasticsearch.index.fielddata.IndexOrdinalsFieldData;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import java.io.IOException;
@ -52,7 +53,7 @@ public enum GlobalOrdinalsBuilder {
}
final XOrdinalMap ordinalMap = XOrdinalMap.build(null, subs, PackedInts.DEFAULT);
final long memorySizeInBytes = ordinalMap.ramBytesUsed();
breakerService.getBreaker().addWithoutBreaking(memorySizeInBytes);
breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(memorySizeInBytes);
if (logger.isDebugEnabled()) {
logger.debug(

View File

@ -31,8 +31,8 @@ import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource.N
import org.elasticsearch.index.fielddata.fieldcomparator.BytesRefFieldComparatorSource;
import org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalsBuilder;
import org.elasticsearch.index.mapper.FieldMapper.Names;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import java.io.IOException;
import java.util.Map;

View File

@ -33,7 +33,7 @@ import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.FieldMapper.Names;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
import java.io.IOException;

View File

@ -29,8 +29,8 @@ import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.FieldMapper.Names;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
/**
* A field data implementation that forbids loading and will throw an {@link org.elasticsearch.ElasticsearchIllegalStateException} if you try to load

View File

@ -37,7 +37,7 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.internal.IdFieldMapper;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import java.util.Map;
import java.util.Set;

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index.fielddata.plain;
import org.apache.lucene.index.*;
import org.apache.lucene.util.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
@ -34,7 +35,7 @@ import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
/**
@ -70,7 +71,7 @@ public class DoubleArrayIndexFieldData extends AbstractIndexFieldData<AtomicNume
Terms terms = reader.terms(getFieldNames().indexName());
AtomicNumericFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker());
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA));
if (terms == null) {
data = AtomicDoubleFieldData.empty(reader.maxDoc());
estimator.afterLoad(null, data.ramBytesUsed());

View File

@ -25,6 +25,7 @@ import org.apache.lucene.util.fst.FST;
import org.apache.lucene.util.fst.FST.INPUT_TYPE;
import org.apache.lucene.util.fst.PositiveIntOutputs;
import org.apache.lucene.util.fst.Util;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.*;
@ -33,7 +34,7 @@ import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
/**
*/
@ -63,7 +64,7 @@ public class FSTBytesIndexFieldData extends AbstractIndexOrdinalsFieldData {
Terms terms = reader.terms(getFieldNames().indexName());
AtomicOrdinalsFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker());
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA));
if (terms == null) {
data = AbstractAtomicOrdinalsFieldData.empty();
estimator.afterLoad(null, data.ramBytesUsed());

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.fielddata.plain;
import org.apache.lucene.index.*;
import org.apache.lucene.util.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.FloatArray;
@ -33,7 +34,7 @@ import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
/**
@ -68,7 +69,7 @@ public class FloatArrayIndexFieldData extends AbstractIndexFieldData<AtomicNumer
Terms terms = reader.terms(getFieldNames().indexName());
AtomicNumericFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker());
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA));
if (terms == null) {
data = AtomicDoubleFieldData.empty(reader.maxDoc());
estimator.afterLoad(null, data.ramBytesUsed());

View File

@ -31,7 +31,7 @@ import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource.N
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.FieldMapper.Names;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
import java.io.IOException;

View File

@ -25,6 +25,7 @@ import org.apache.lucene.index.Terms;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.packed.PackedInts;
import org.apache.lucene.util.packed.PagedMutable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.DistanceUnit;
@ -38,7 +39,7 @@ import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.geo.GeoPointFieldMapper;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
/**
*/
@ -82,7 +83,7 @@ public class GeoPointCompressedIndexFieldData extends AbstractIndexGeoPointField
Terms terms = reader.terms(getFieldNames().indexName());
AtomicGeoPointFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker());
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA));
if (terms == null) {
data = AbstractAtomicGeoPointFieldData.empty(reader.maxDoc());
estimator.afterLoad(null, data.ramBytesUsed());

View File

@ -23,6 +23,7 @@ import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.RandomAccessOrds;
import org.apache.lucene.index.Terms;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
@ -34,7 +35,7 @@ import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
/**
*/
@ -64,7 +65,7 @@ public class GeoPointDoubleArrayIndexFieldData extends AbstractIndexGeoPointFiel
Terms terms = reader.terms(getFieldNames().indexName());
AtomicGeoPointFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker());
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA));
if (terms == null) {
data = AbstractAtomicGeoPointFieldData.empty(reader.maxDoc());
estimator.afterLoad(null, data.ramBytesUsed());

View File

@ -27,7 +27,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.*;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
public class IndexIndexFieldData extends AbstractIndexOrdinalsFieldData {

View File

@ -22,7 +22,7 @@ import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreaker;
import java.io.IOException;
@ -33,9 +33,9 @@ import java.io.IOException;
*/
public class NonEstimatingEstimator implements AbstractIndexFieldData.PerValueEstimator {
private final MemoryCircuitBreaker breaker;
private final CircuitBreaker breaker;
NonEstimatingEstimator(MemoryCircuitBreaker breaker) {
NonEstimatingEstimator(CircuitBreaker breaker) {
this.breaker = breaker;
}

View File

@ -27,7 +27,7 @@ import org.apache.lucene.util.packed.MonotonicAppendingLongBuffer;
import org.apache.lucene.util.packed.PackedInts;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.*;
@ -38,7 +38,7 @@ import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
import java.io.IOException;
@ -88,7 +88,7 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData<AtomicNume
final AtomicReader reader = context.reader();
Terms terms = reader.terms(getFieldNames().indexName());
AtomicNumericFieldData data = null;
PackedArrayEstimator estimator = new PackedArrayEstimator(breakerService.getBreaker(), getNumericType(), getFieldNames().fullName());
PackedArrayEstimator estimator = new PackedArrayEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA), getNumericType(), getFieldNames().fullName());
if (terms == null) {
data = AtomicLongFieldData.empty(reader.maxDoc());
estimator.adjustForNoTerms(data.ramBytesUsed());
@ -355,11 +355,11 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData<AtomicNume
*/
public class PackedArrayEstimator implements PerValueEstimator {
private final MemoryCircuitBreaker breaker;
private final CircuitBreaker breaker;
private final NumericType type;
private final String fieldName;
public PackedArrayEstimator(MemoryCircuitBreaker breaker, NumericType type, String fieldName) {
public PackedArrayEstimator(CircuitBreaker breaker, NumericType type, String fieldName) {
this.breaker = breaker;
this.type = type;
this.fieldName = fieldName;

View File

@ -24,7 +24,7 @@ import org.apache.lucene.index.*;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PagedBytes;
import org.apache.lucene.util.packed.MonotonicAppendingLongBuffer;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.*;
@ -33,7 +33,7 @@ import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import java.io.IOException;
@ -61,7 +61,7 @@ public class PagedBytesIndexFieldData extends AbstractIndexOrdinalsFieldData {
AtomicReader reader = context.reader();
AtomicOrdinalsFieldData data = null;
PagedBytesEstimator estimator = new PagedBytesEstimator(context, breakerService.getBreaker(), getFieldNames().fullName());
PagedBytesEstimator estimator = new PagedBytesEstimator(context, breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA), getFieldNames().fullName());
Terms terms = reader.terms(getFieldNames().indexName());
if (terms == null) {
data = AbstractAtomicOrdinalsFieldData.empty();
@ -125,11 +125,11 @@ public class PagedBytesIndexFieldData extends AbstractIndexOrdinalsFieldData {
public class PagedBytesEstimator implements PerValueEstimator {
private final AtomicReaderContext context;
private final MemoryCircuitBreaker breaker;
private final CircuitBreaker breaker;
private final String fieldName;
private long estimatedBytes;
PagedBytesEstimator(AtomicReaderContext context, MemoryCircuitBreaker breaker, String fieldName) {
PagedBytesEstimator(AtomicReaderContext context, CircuitBreaker breaker, String fieldName) {
this.breaker = breaker;
this.context = context;
this.fieldName = fieldName;

View File

@ -32,7 +32,7 @@ import org.apache.lucene.util.packed.PackedInts;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -47,7 +47,7 @@ import org.elasticsearch.index.mapper.FieldMapper.Names;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
import java.io.IOException;
@ -101,7 +101,7 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
new ParentChildIntersectTermsEnum(reader, UidFieldMapper.NAME, ParentFieldMapper.NAME),
parentTypes
);
ParentChildEstimator estimator = new ParentChildEstimator(breakerService.getBreaker(), termsEnum);
ParentChildEstimator estimator = new ParentChildEstimator(breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA), termsEnum);
TermsEnum estimatedTermsEnum = estimator.beforeLoad(null);
ObjectObjectOpenHashMap<String, TypeBuilder> typeBuilders = ObjectObjectOpenHashMap.newInstance();
try {
@ -210,13 +210,13 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
*/
public class ParentChildEstimator implements PerValueEstimator {
private final MemoryCircuitBreaker breaker;
private final CircuitBreaker breaker;
private final TermsEnum filteredEnum;
// The TermsEnum is passed in here instead of being generated in the
// beforeLoad() function since it's filtered inside the previous
// TermsEnum wrappers
public ParentChildEstimator(MemoryCircuitBreaker breaker, TermsEnum filteredEnum) {
public ParentChildEstimator(CircuitBreaker breaker, TermsEnum filteredEnum) {
this.breaker = breaker;
this.filteredEnum = filteredEnum;
}
@ -337,7 +337,7 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
}
}
breakerService.getBreaker().addWithoutBreaking(ramBytesUsed);
breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(ramBytesUsed);
if (logger.isDebugEnabled()) {
logger.debug(
"Global-ordinals[_parent] took {}",

View File

@ -29,8 +29,8 @@ import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource.N
import org.elasticsearch.index.fielddata.fieldcomparator.BytesRefFieldComparatorSource;
import org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalsBuilder;
import org.elasticsearch.index.mapper.FieldMapper.Names;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
public class SortedSetDVOrdinalsIndexFieldData extends DocValuesIndexFieldData implements IndexOrdinalsFieldData {

View File

@ -29,10 +29,8 @@ import org.elasticsearch.indices.analysis.IndicesAnalysisModule;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.indices.recovery.RecoverySettings;
@ -81,7 +79,6 @@ public class IndicesModule extends AbstractModule implements SpawnModules {
bind(IndicesWarmer.class).to(InternalIndicesWarmer.class).asEagerSingleton();
bind(UpdateHelper.class).asEagerSingleton();
bind(CircuitBreakerService.class).to(InternalCircuitBreakerService.class).asEagerSingleton();
bind(IndicesFieldDataCacheListener.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.breaker;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
/**
* Stats class encapsulating all of the different circuit breaker stats
*/
public class AllCircuitBreakerStats implements Streamable, ToXContent {
private CircuitBreakerStats[] allStats = new CircuitBreakerStats[0];
public AllCircuitBreakerStats() {
}
public AllCircuitBreakerStats(CircuitBreakerStats[] allStats) {
this.allStats = allStats;
}
public CircuitBreakerStats[] getAllStats() {
return this.allStats;
}
public CircuitBreakerStats getStats(CircuitBreaker.Name name) {
for (CircuitBreakerStats stats : allStats) {
if (stats.getName() == name) {
return stats;
}
}
return null;
}
public static AllCircuitBreakerStats readOptionalAllCircuitBreakerStats(StreamInput in) throws IOException {
AllCircuitBreakerStats stats = in.readOptionalStreamable(new AllCircuitBreakerStats());
return stats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
int statCount = in.readVInt();
CircuitBreakerStats[] newStats = new CircuitBreakerStats[statCount];
for (int i = 0; i < statCount; i++) {
CircuitBreakerStats stats = new CircuitBreakerStats();
stats.readFrom(in);
newStats[i] = stats;
}
allStats = newStats;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(allStats.length);
for (CircuitBreakerStats stats : allStats) {
if (stats != null) {
stats.writeTo(out);
}
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.BREAKERS);
for (CircuitBreakerStats stats : allStats) {
if (stats != null) {
stats.toXContent(builder, params);
}
}
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString BREAKERS = new XContentBuilderString("breakers");
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.breaker;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.unit.ByteSizeValue;
/**
* Settings for a {@link CircuitBreaker}
*/
public class BreakerSettings {
private final CircuitBreaker.Name name;
private final long limitBytes;
private final double overhead;
public BreakerSettings(CircuitBreaker.Name name, long limitBytes, double overhead) {
this.name = name;
this.limitBytes = limitBytes;
this.overhead = overhead;
}
public CircuitBreaker.Name getName() {
return this.name;
}
public long getLimit() {
return this.limitBytes;
}
public double getOverhead() {
return this.overhead;
}
@Override
public String toString() {
return "[" + this.name.toString() +
",limit=" + this.limitBytes + "/" + new ByteSizeValue(this.limitBytes) +
",overhead=" + this.overhead + "]";
}
}

View File

@ -17,24 +17,23 @@
* under the License.
*/
package org.elasticsearch.indices.fielddata.breaker;
package org.elasticsearch.indices.breaker;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
/**
* Interface for Circuit Breaker services, which provide breakers to classes
* that load field data.
*/
public interface CircuitBreakerService {
public class CircuitBreakerModule extends AbstractModule {
/**
* @return the breaker that can be used to register estimates against
*/
public MemoryCircuitBreaker getBreaker();
public static final String IMPL = "indices.breaker.breaker_impl";
private final Settings settings;
/**
* @return stats about the breaker
*/
public FieldDataBreakerStats stats();
public CircuitBreakerModule(Settings settings) {
this.settings = settings;
}
@Override
protected void configure() {
bind(CircuitBreakerService.class).to(settings.getAsClass(IMPL, HierarchyCircuitBreakerService.class)).asEagerSingleton();
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.breaker;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
/**
* Interface for Circuit Breaker services, which provide breakers to classes
* that load field data.
*/
public abstract class CircuitBreakerService extends AbstractLifecycleComponent<InternalCircuitBreakerService> {
protected CircuitBreakerService(Settings settings) {
super(settings);
}
/**
* @return the breaker that can be used to register estimates against
*/
public abstract CircuitBreaker getBreaker(CircuitBreaker.Name type);
/**
* @return stats about all breakers
*/
public abstract AllCircuitBreakerStats stats();
/**
* @return stats about a specific breaker
*/
public abstract CircuitBreakerStats stats(CircuitBreaker.Name name);
protected void doStart() throws ElasticsearchException {
}
protected void doStop() throws ElasticsearchException {
}
protected void doClose() throws ElasticsearchException {
}
}

View File

@ -17,9 +17,10 @@
* under the License.
*/
package org.elasticsearch.indices.fielddata.breaker;
package org.elasticsearch.indices.breaker;
import org.elasticsearch.Version;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -29,30 +30,37 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
import java.util.Locale;
/**
* Class encapsulating stats about the circuit breaker
*/
public class FieldDataBreakerStats implements Streamable, ToXContent {
public class CircuitBreakerStats implements Streamable, ToXContent {
private long maximum;
private CircuitBreaker.Name name;
private long limit;
private long estimated;
private long trippedCount;
private double overhead;
FieldDataBreakerStats() {
CircuitBreakerStats() {
}
public FieldDataBreakerStats(long maximum, long estimated, double overhead, long trippedCount) {
this.maximum = maximum;
public CircuitBreakerStats(CircuitBreaker.Name name, long limit, long estimated, double overhead, long trippedCount) {
this.name = name;
this.limit = limit;
this.estimated = estimated;
this.trippedCount = trippedCount;
this.overhead = overhead;
}
public long getMaximum() {
return this.maximum;
public CircuitBreaker.Name getName() {
return this.name;
}
public long getLimit() {
return this.limit;
}
public long getEstimated() {
@ -67,14 +75,15 @@ public class FieldDataBreakerStats implements Streamable, ToXContent {
return this.overhead;
}
public static FieldDataBreakerStats readOptionalCircuitBreakerStats(StreamInput in) throws IOException {
FieldDataBreakerStats stats = in.readOptionalStreamable(new FieldDataBreakerStats());
public static CircuitBreakerStats readOptionalCircuitBreakerStats(StreamInput in) throws IOException {
CircuitBreakerStats stats = in.readOptionalStreamable(new CircuitBreakerStats());
return stats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
maximum = in.readLong();
// limit is the maximum from the old circuit breaker stats for backwards compatibility
limit = in.readLong();
estimated = in.readLong();
overhead = in.readDouble();
if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
@ -82,23 +91,31 @@ public class FieldDataBreakerStats implements Streamable, ToXContent {
} else {
this.trippedCount = -1;
}
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
this.name = CircuitBreaker.Name.readFrom(in);
} else {
this.name = CircuitBreaker.Name.FIELDDATA;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(maximum);
out.writeLong(limit);
out.writeLong(estimated);
out.writeDouble(overhead);
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeLong(trippedCount);
}
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
CircuitBreaker.Name.writeTo(name, out);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.BREAKER);
builder.field(Fields.MAX, maximum);
builder.field(Fields.MAX_HUMAN, new ByteSizeValue(maximum));
builder.startObject(name.toString().toLowerCase(Locale.ROOT));
builder.field(Fields.LIMIT, limit);
builder.field(Fields.LIMIT_HUMAN, new ByteSizeValue(limit));
builder.field(Fields.ESTIMATED, estimated);
builder.field(Fields.ESTIMATED_HUMAN, new ByteSizeValue(estimated));
builder.field(Fields.OVERHEAD, overhead);
@ -107,10 +124,17 @@ public class FieldDataBreakerStats implements Streamable, ToXContent {
return builder;
}
@Override
public String toString() {
return "[" + this.name.toString() +
",limit=" + this.limit + "/" + new ByteSizeValue(this.limit) +
",estimated=" + this.estimated + "/" + new ByteSizeValue(this.estimated) +
",overhead=" + this.overhead + ",tripped=" + this.trippedCount + "]";
}
static final class Fields {
static final XContentBuilderString BREAKER = new XContentBuilderString("fielddata_breaker");
static final XContentBuilderString MAX = new XContentBuilderString("maximum_size_in_bytes");
static final XContentBuilderString MAX_HUMAN = new XContentBuilderString("maximum_size");
static final XContentBuilderString LIMIT = new XContentBuilderString("limit_size_in_bytes");
static final XContentBuilderString LIMIT_HUMAN = new XContentBuilderString("limit_size");
static final XContentBuilderString ESTIMATED = new XContentBuilderString("estimated_size_in_bytes");
static final XContentBuilderString ESTIMATED_HUMAN = new XContentBuilderString("estimated_size");
static final XContentBuilderString OVERHEAD = new XContentBuilderString("overhead");

View File

@ -0,0 +1,235 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.breaker;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.node.settings.NodeSettingsService;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static com.google.common.collect.Lists.newArrayList;
/**
* CircuitBreakerService that attempts to redistribute space between breakers
* if tripped
*/
public class HierarchyCircuitBreakerService extends CircuitBreakerService {
private volatile ImmutableMap<CircuitBreaker.Name, CircuitBreaker> breakers;
public static final String TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING = "indices.breaker.total.limit";
public static final String DEFAULT_TOTAL_CIRCUIT_BREAKER_LIMIT = "70%";
public static final String FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING = "indices.breaker.fielddata.limit";
public static final String FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.breaker.fielddata.overhead";
public static final String DEFAULT_FIELDDATA_BREAKER_LIMIT = "60%";
public static final double DEFAULT_FIELDDATA_OVERHEAD_CONSTANT = 1.03;
public static final String REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING = "indices.breaker.request.limit";
public static final String REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.breaker.request.overhead";
public static final String DEFAULT_REQUEST_BREAKER_LIMIT = "40%";
private volatile BreakerSettings parentSettings;
private volatile BreakerSettings fielddataSettings;
private volatile BreakerSettings requestSettings;
// Tripped count for when redistribution was attempted but wasn't successful
private final AtomicLong parentTripCount = new AtomicLong(0);
@Inject
public HierarchyCircuitBreakerService(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings);
// This uses the old InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING
// setting to keep backwards compatibility with 1.2, it can be safely
// removed when compatibility with 1.2 is no longer needed
String compatibilityFielddataLimitDefault = DEFAULT_FIELDDATA_BREAKER_LIMIT;
ByteSizeValue compatibilityFielddataLimit = settings.getAsMemory(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, null);
if (compatibilityFielddataLimit != null) {
compatibilityFielddataLimitDefault = compatibilityFielddataLimit.toString();
}
// This uses the old InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING
// setting to keep backwards compatibility with 1.2, it can be safely
// removed when compatibility with 1.2 is no longer needed
double compatibilityFielddataOverheadDefault = DEFAULT_FIELDDATA_OVERHEAD_CONSTANT;
Double compatibilityFielddataOverhead = settings.getAsDouble(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, null);
if (compatibilityFielddataOverhead != null) {
compatibilityFielddataOverheadDefault = compatibilityFielddataOverhead;
}
this.fielddataSettings = new BreakerSettings(CircuitBreaker.Name.FIELDDATA,
settings.getAsMemory(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, compatibilityFielddataLimitDefault).bytes(),
settings.getAsDouble(FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, compatibilityFielddataOverheadDefault));
this.requestSettings = new BreakerSettings(CircuitBreaker.Name.REQUEST,
settings.getAsMemory(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, DEFAULT_REQUEST_BREAKER_LIMIT).bytes(),
settings.getAsDouble(REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0));
// Validate the configured settings
validateSettings(new BreakerSettings[] {this.requestSettings, this.fielddataSettings});
this.parentSettings = new BreakerSettings(CircuitBreaker.Name.PARENT,
settings.getAsMemory(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, DEFAULT_TOTAL_CIRCUIT_BREAKER_LIMIT).bytes(), 1.0);
if (logger.isTraceEnabled()) {
logger.trace("parent circuit breaker with settings {}", this.parentSettings);
}
Map<CircuitBreaker.Name, CircuitBreaker> tempBreakers = new HashMap<>();
tempBreakers.put(CircuitBreaker.Name.FIELDDATA, new ChildMemoryCircuitBreaker(fielddataSettings, logger, this, CircuitBreaker.Name.FIELDDATA));
tempBreakers.put(CircuitBreaker.Name.REQUEST, new ChildMemoryCircuitBreaker(requestSettings, logger, this, CircuitBreaker.Name.REQUEST));
this.breakers = ImmutableMap.copyOf(tempBreakers);
nodeSettingsService.addListener(new ApplySettings());
}
public class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
boolean changed = false;
// Fielddata settings
BreakerSettings newFielddataSettings = HierarchyCircuitBreakerService.this.fielddataSettings;
ByteSizeValue newFielddataMax = settings.getAsMemory(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, null);
Double newFielddataOverhead = settings.getAsDouble(FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, null);
if (newFielddataMax != null || newFielddataOverhead != null) {
changed = true;
long newFielddataLimitBytes = newFielddataMax == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getLimit() : newFielddataMax.bytes();
newFielddataOverhead = newFielddataOverhead == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getOverhead() : newFielddataOverhead;
newFielddataSettings = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, newFielddataLimitBytes, newFielddataOverhead);
}
// Request settings
BreakerSettings newRequestSettings = HierarchyCircuitBreakerService.this.requestSettings;
ByteSizeValue newRequestMax = settings.getAsMemory(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, null);
Double newRequestOverhead = settings.getAsDouble(REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, null);
if (newRequestMax != null || newRequestOverhead != null) {
changed = true;
long newRequestLimitBytes = newRequestMax == null ? HierarchyCircuitBreakerService.this.requestSettings.getLimit() : newRequestMax.bytes();
newRequestOverhead = newRequestOverhead == null ? HierarchyCircuitBreakerService.this.requestSettings.getOverhead() : newRequestOverhead;
newRequestSettings = new BreakerSettings(CircuitBreaker.Name.REQUEST, newRequestLimitBytes, newRequestOverhead);
}
// Parent settings
BreakerSettings newParentSettings = HierarchyCircuitBreakerService.this.parentSettings;
long oldParentMax = HierarchyCircuitBreakerService.this.parentSettings.getLimit();
ByteSizeValue newParentMax = settings.getAsMemory(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, null);
if (newParentMax != null && (newParentMax.bytes() != oldParentMax)) {
changed = true;
newParentSettings = new BreakerSettings(CircuitBreaker.Name.PARENT, newParentMax.bytes(), 1.0);
}
if (changed) {
// change all the things
validateSettings(new BreakerSettings[] {newFielddataSettings, newRequestSettings});
logger.info("Updating settings parent: {}, fielddata: {}, request: {}", newParentSettings, newFielddataSettings, newRequestSettings);
HierarchyCircuitBreakerService.this.parentSettings = newParentSettings;
HierarchyCircuitBreakerService.this.fielddataSettings = newFielddataSettings;
HierarchyCircuitBreakerService.this.requestSettings = newRequestSettings;
Map<CircuitBreaker.Name, CircuitBreaker> tempBreakers = new HashMap<>();
tempBreakers.put(CircuitBreaker.Name.FIELDDATA, new ChildMemoryCircuitBreaker(newFielddataSettings,
(ChildMemoryCircuitBreaker)HierarchyCircuitBreakerService.this.breakers.get(CircuitBreaker.Name.FIELDDATA),
logger, HierarchyCircuitBreakerService.this, CircuitBreaker.Name.FIELDDATA));
tempBreakers.put(CircuitBreaker.Name.REQUEST, new ChildMemoryCircuitBreaker(newRequestSettings,
(ChildMemoryCircuitBreaker)HierarchyCircuitBreakerService.this.breakers.get(CircuitBreaker.Name.REQUEST),
logger, HierarchyCircuitBreakerService.this, CircuitBreaker.Name.REQUEST));
HierarchyCircuitBreakerService.this.breakers = ImmutableMap.copyOf(tempBreakers);
}
}
}
/**
* Validate that child settings are valid
* @throws ElasticsearchIllegalStateException
*/
public static void validateSettings(BreakerSettings[] childrenSettings) throws ElasticsearchIllegalStateException {
for (BreakerSettings childSettings : childrenSettings) {
// If the child is disabled, ignore it
if (childSettings.getLimit() == -1) {
continue;
}
if (childSettings.getOverhead() < 0) {
throw new ElasticsearchIllegalStateException("Child breaker overhead " + childSettings + " must be non-negative");
}
}
}
@Override
public CircuitBreaker getBreaker(CircuitBreaker.Name name) {
return this.breakers.get(name);
}
@Override
public AllCircuitBreakerStats stats() {
long parentEstimated = 0;
List<CircuitBreakerStats> allStats = newArrayList();
// Gather the "estimated" count for the parent breaker by adding the
// estimations for each individual breaker
for (CircuitBreaker breaker : this.breakers.values()) {
allStats.add(stats(breaker.getName()));
parentEstimated += breaker.getUsed();
}
// Manually add the parent breaker settings since they aren't part of the breaker map
allStats.add(new CircuitBreakerStats(CircuitBreaker.Name.PARENT, parentSettings.getLimit(),
parentEstimated, 1.0, parentTripCount.get()));
return new AllCircuitBreakerStats(allStats.toArray(new CircuitBreakerStats[allStats.size()]));
}
@Override
public CircuitBreakerStats stats(CircuitBreaker.Name name) {
CircuitBreaker breaker = this.breakers.get(name);
return new CircuitBreakerStats(breaker.getName(), breaker.getLimit(), breaker.getUsed(), breaker.getOverhead(), breaker.getTrippedCount());
}
/**
* Checks whether the parent breaker has been tripped
* @param label
* @throws CircuitBreakingException
*/
public void checkParentLimit(String label) throws CircuitBreakingException {
long totalUsed = 0;
for (CircuitBreaker breaker : this.breakers.values()) {
totalUsed += (breaker.getUsed() * breaker.getOverhead());
}
long parentLimit = this.parentSettings.getLimit();
if (totalUsed > parentLimit) {
this.parentTripCount.incrementAndGet();
throw new CircuitBreakingException("[PARENT] Data too large, data for [" +
label + "] would be larger than limit of [" +
parentLimit + "/" + new ByteSizeValue(parentLimit) + "]",
totalUsed, parentLimit);
}
}
}

View File

@ -16,11 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.fielddata.breaker;
package org.elasticsearch.indices.breaker;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -32,7 +31,7 @@ import org.elasticsearch.node.settings.NodeSettingsService;
* that can be used to keep track of memory usage across the node, preventing
* actions that could cause an {@link OutOfMemoryError} on the node.
*/
public class InternalCircuitBreakerService extends AbstractLifecycleComponent<InternalCircuitBreakerService> implements CircuitBreakerService {
public class InternalCircuitBreakerService extends CircuitBreakerService {
public static final String CIRCUIT_BREAKER_MAX_BYTES_SETTING = "indices.fielddata.breaker.limit";
public static final String CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.fielddata.breaker.overhead";
@ -87,7 +86,8 @@ public class InternalCircuitBreakerService extends AbstractLifecycleComponent<In
/**
* @return a {@link org.elasticsearch.common.breaker.MemoryCircuitBreaker} that can be used for aggregating memory usage
*/
public MemoryCircuitBreaker getBreaker() {
public MemoryCircuitBreaker getBreaker(CircuitBreaker.Name name) {
// Return the only breaker, regardless of name
return this.breaker;
}
@ -104,19 +104,17 @@ public class InternalCircuitBreakerService extends AbstractLifecycleComponent<In
}
@Override
public FieldDataBreakerStats stats() {
return new FieldDataBreakerStats(breaker.getMaximum(), breaker.getUsed(), breaker.getOverhead(), breaker.getTrippedCount());
public AllCircuitBreakerStats stats() {
return new AllCircuitBreakerStats(new CircuitBreakerStats[]{
new CircuitBreakerStats(CircuitBreaker.Name.FIELDDATA, breaker.getLimit(), breaker.getUsed(),
breaker.getOverhead(), breaker.getTrippedCount())
});
}
@Override
protected void doStart() throws ElasticsearchException {
}
@Override
protected void doStop() throws ElasticsearchException {
}
@Override
protected void doClose() throws ElasticsearchException {
public CircuitBreakerStats stats(CircuitBreaker.Name name) {
// There is only a single breaker, so always return it
return new CircuitBreakerStats(CircuitBreaker.Name.FIELDDATA, breaker.getLimit(), breaker.getUsed(),
breaker.getOverhead(), breaker.getTrippedCount());
}
}

View File

@ -17,31 +17,41 @@
* under the License.
*/
package org.elasticsearch.indices.fielddata.breaker;
package org.elasticsearch.indices.breaker;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.ByteSizeValue;
/**
* Class that returns a breaker that never breaks
*/
public class NoneCircuitBreakerService implements CircuitBreakerService {
public class NoneCircuitBreakerService extends CircuitBreakerService {
private final ESLogger logger = Loggers.getLogger(NoneCircuitBreakerService.class);
private final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue(Long.MAX_VALUE), 0.0, logger);
public NoneCircuitBreakerService() {}
public NoneCircuitBreakerService() {
super(ImmutableSettings.EMPTY);
}
@Override
public MemoryCircuitBreaker getBreaker() {
public MemoryCircuitBreaker getBreaker(CircuitBreaker.Name name) {
return breaker;
}
@Override
public FieldDataBreakerStats stats() {
return new FieldDataBreakerStats(-1, -1, 0, 0);
public AllCircuitBreakerStats stats() {
return new AllCircuitBreakerStats(new CircuitBreakerStats[] {stats(CircuitBreaker.Name.FIELDDATA)});
}
@Override
public CircuitBreakerStats stats(CircuitBreaker.Name name) {
return new CircuitBreakerStats(CircuitBreaker.Name.FIELDDATA, -1, -1, 0, 0);
}
}

View File

@ -20,11 +20,12 @@
package org.elasticsearch.indices.fielddata.cache;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
/**
* A {@link IndexFieldDataCache.Listener} implementation that updates indices (node) level statistics / service about
@ -48,7 +49,7 @@ public class IndicesFieldDataCacheListener implements IndexFieldDataCache.Listen
@Override
public void onUnload(FieldMapper.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
assert sizeInBytes >= 0 : "When reducing circuit breaker, it should be adjusted with a number higher or equal to 0 and not [" + sizeInBytes + "]";
circuitBreakerService.getBreaker().addWithoutBreaking(-sizeInBytes);
circuitBreakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(-sizeInBytes);
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.node.internal;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
@ -39,7 +38,6 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
@ -49,7 +47,6 @@ import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Injectors;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.io.CachedStreams;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
@ -72,6 +69,7 @@ import org.elasticsearch.http.HttpServerModule;
import org.elasticsearch.index.search.shape.ShapeModule;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerModule;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
@ -162,6 +160,7 @@ public final class InternalNode implements Node {
modules.add(new Version.Module(version));
modules.add(new CacheRecyclerModule(settings));
modules.add(new PageCacheRecyclerModule(settings));
modules.add(new CircuitBreakerModule(settings));
modules.add(new BigArraysModule(settings));
modules.add(new PluginsModule(settings, pluginsService));
modules.add(new SettingsModule(settings));

View File

@ -33,7 +33,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.http.HttpServer;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.threadpool.ThreadPool;

View File

@ -122,7 +122,7 @@ public class PercolateContext extends SearchContext {
this.types = new String[]{request.documentType()};
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
this.bigArrays = bigArrays;
this.bigArrays = bigArrays.withCircuitBreaking();
this.querySearchResult = new QuerySearchResult(0, searchShardTarget);
this.engineSearcher = indexShard.acquireSearcher("percolate");
this.searcher = new ContextIndexSearcher(this, engineSearcher);

View File

@ -193,7 +193,8 @@ public class DefaultSearchContext extends SearchContext {
this.scriptService = scriptService;
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
this.bigArrays = bigArrays;
// SearchContexts use a BigArrays that can circuit break
this.bigArrays = bigArrays.withCircuitBreaking();
this.dfsResult = new DfsSearchResult(id, shardTarget);
this.queryResult = new QuerySearchResult(id, shardTarget);
this.fetchResult = new FetchSearchResult(id, shardTarget);

View File

@ -37,7 +37,7 @@ import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.ContentPath;
import org.elasticsearch.index.mapper.Mapper.BuilderContext;
import org.elasticsearch.index.mapper.core.LongFieldMapper;
import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import java.util.Random;

View File

@ -19,11 +19,17 @@
package org.elasticsearch.common.breaker;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.indices.breaker.BreakerSettings;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
@ -75,6 +81,127 @@ public class MemoryCircuitBreakerTests extends ElasticsearchTestCase {
assertThat("breaker was tripped exactly once", breaker.getTrippedCount(), equalTo(1L));
}
@Test
public void testThreadedUpdatesToChildBreaker() throws Exception {
final int NUM_THREADS = 5;
final int BYTES_PER_THREAD = 1000;
final Thread[] threads = new Thread[NUM_THREADS];
final AtomicBoolean tripped = new AtomicBoolean(false);
final AtomicReference<Throwable> lastException = new AtomicReference<>(null);
final AtomicReference<ChildMemoryCircuitBreaker> breakerRef = new AtomicReference<>(null);
final CircuitBreakerService service = new HierarchyCircuitBreakerService(ImmutableSettings.EMPTY, new NodeSettingsService(ImmutableSettings.EMPTY)) {
@Override
public CircuitBreaker getBreaker(CircuitBreaker.Name type) {
return breakerRef.get();
}
@Override
public void checkParentLimit(String label) throws CircuitBreakingException {
// never trip
}
};
final BreakerSettings settings = new BreakerSettings(CircuitBreaker.Name.REQUEST, (BYTES_PER_THREAD * NUM_THREADS) - 1, 1.0);
final ChildMemoryCircuitBreaker breaker = new ChildMemoryCircuitBreaker(settings, logger,
(HierarchyCircuitBreakerService)service, CircuitBreaker.Name.REQUEST);
breakerRef.set(breaker);
for (int i = 0; i < NUM_THREADS; i++) {
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < BYTES_PER_THREAD; j++) {
try {
breaker.addEstimateBytesAndMaybeBreak(1L, "test");
} catch (CircuitBreakingException e) {
if (tripped.get()) {
assertThat("tripped too many times", true, equalTo(false));
} else {
assertThat(tripped.compareAndSet(false, true), equalTo(true));
}
} catch (Throwable e2) {
lastException.set(e2);
}
}
}
});
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
assertThat("no other exceptions were thrown", lastException.get(), equalTo(null));
assertThat("breaker was tripped exactly once", tripped.get(), equalTo(true));
assertThat("breaker was tripped exactly once", breaker.getTrippedCount(), equalTo(1L));
}
@Test
public void testThreadedUpdatesToChildBreakerWithParentLimit() throws Exception {
final int NUM_THREADS = 5;
final int BYTES_PER_THREAD = 1000;
final Thread[] threads = new Thread[NUM_THREADS];
final AtomicInteger tripped = new AtomicInteger(0);
final AtomicReference<Throwable> lastException = new AtomicReference<>(null);
final AtomicInteger parentTripped = new AtomicInteger(0);
final AtomicReference<ChildMemoryCircuitBreaker> breakerRef = new AtomicReference<>(null);
final CircuitBreakerService service = new HierarchyCircuitBreakerService(ImmutableSettings.EMPTY, new NodeSettingsService(ImmutableSettings.EMPTY)) {
@Override
public CircuitBreaker getBreaker(CircuitBreaker.Name type) {
return breakerRef.get();
}
@Override
public void checkParentLimit(String label) throws CircuitBreakingException {
// Parent will trip right before regular breaker would trip
if (getBreaker(CircuitBreaker.Name.REQUEST).getUsed() > (BYTES_PER_THREAD * NUM_THREADS) - 2) {
parentTripped.incrementAndGet();
throw new CircuitBreakingException("parent tripped");
}
}
};
final BreakerSettings settings = new BreakerSettings(CircuitBreaker.Name.REQUEST, (BYTES_PER_THREAD * NUM_THREADS) - 1, 1.0);
final ChildMemoryCircuitBreaker breaker = new ChildMemoryCircuitBreaker(settings, logger,
(HierarchyCircuitBreakerService)service, CircuitBreaker.Name.REQUEST);
breakerRef.set(breaker);
for (int i = 0; i < NUM_THREADS; i++) {
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < BYTES_PER_THREAD; j++) {
try {
breaker.addEstimateBytesAndMaybeBreak(1L, "test");
} catch (CircuitBreakingException e) {
tripped.incrementAndGet();
if (tripped.get() > 2) {
assertThat("tripped too many times: " + tripped.get(), true, equalTo(false));
}
} catch (Throwable e2) {
lastException.set(e2);
}
}
}
});
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
assertThat("no other exceptions were thrown", lastException.get(), equalTo(null));
assertThat("breaker was tripped exactly once", breaker.getTrippedCount(), equalTo(0L));
assertThat("parent breaker was tripped exactly twice", parentTripped.get(), equalTo(2));
assertThat("total breaker was tripped exactly twice", tripped.get(), equalTo(2));
}
@Test
public void testConstantFactor() throws Exception {
final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue(15), 1.6, logger);

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.hamcrest.Matchers;
import org.jboss.netty.buffer.ChannelBuffer;
@ -49,7 +50,7 @@ public class PagedBytesReferenceTest extends ElasticsearchTestCase {
@Before
public void setUp() throws Exception {
super.setUp();
bigarrays = new BigArrays(ImmutableSettings.EMPTY, null);
bigarrays = new BigArrays(ImmutableSettings.EMPTY, null, new NoneCircuitBreakerService());
}
@After

View File

@ -20,9 +20,13 @@
package org.elasticsearch.common.util;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.test.cache.recycler.MockBigArrays;
import org.elasticsearch.test.cache.recycler.MockPageCacheRecycler;
@ -37,7 +41,7 @@ public class BigArraysTests extends ElasticsearchTestCase {
public static BigArrays randombigArrays() {
final PageCacheRecycler recycler = randomBoolean() ? null : new MockPageCacheRecycler(ImmutableSettings.EMPTY, new ThreadPool("BigArraysTests"));
return new MockBigArrays(ImmutableSettings.EMPTY, recycler);
return new MockBigArrays(ImmutableSettings.EMPTY, recycler, new NoneCircuitBreakerService());
}
private BigArrays bigArrays;
@ -283,7 +287,7 @@ public class BigArraysTests extends ElasticsearchTestCase {
// not equal: contents differ
final ByteArray a3 = byteArrayWithBytes(new byte[]{1,2,3});
final ByteArray a4 = byteArrayWithBytes(new byte[]{1,1,3});
final ByteArray a4 = byteArrayWithBytes(new byte[]{1, 1, 3});
assertFalse(bigArrays.equals(a3, a4));
a3.close();
a4.close();
@ -329,58 +333,51 @@ public class BigArraysTests extends ElasticsearchTestCase {
return bytearray;
}
public void testByteAccounting() throws Exception {
for (String type : Arrays.asList("Byte", "Int", "Long", "Float", "Double", "Object")) {
BigArrays bigArrays = new BigArrays(ImmutableSettings.builder().put(BigArrays.MAX_SIZE_IN_BYTES_SETTING, Long.MAX_VALUE).build(), null);
Method create = BigArrays.class.getMethod("new" + type + "Array", long.class);
final int size = scaledRandomIntBetween(5, 1 << 16);
BigArray array = (BigArray) create.invoke(bigArrays, size);
assertEquals(array.ramBytesUsed(), bigArrays.sizeInBytes());
Method resize = BigArrays.class.getMethod("resize", array.getClass().getInterfaces()[0], long.class);
int newSize = scaledRandomIntBetween(5, 1 << 16);
array = (BigArray) resize.invoke(bigArrays, array, newSize);
assertEquals(array.ramBytesUsed(), bigArrays.sizeInBytes());
array.close();
assertEquals(0, bigArrays.sizeInBytes());
}
}
public void testMaxSizeExceededOnNew() throws Exception {
final int size = scaledRandomIntBetween(5, 1 << 22);
for (String type : Arrays.asList("Byte", "Int", "Long", "Float", "Double", "Object")) {
BigArrays bigArrays = new BigArrays(ImmutableSettings.builder().put(BigArrays.MAX_SIZE_IN_BYTES_SETTING, randomIntBetween(1, size)).build(), null);
HierarchyCircuitBreakerService hcbs = new HierarchyCircuitBreakerService(
ImmutableSettings.builder()
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, size)
.build(),
new NodeSettingsService(ImmutableSettings.EMPTY));
BigArrays bigArrays = new BigArrays(ImmutableSettings.EMPTY, null, hcbs).withCircuitBreaking();
Method create = BigArrays.class.getMethod("new" + type + "Array", long.class);
try {
create.invoke(bigArrays, size);
fail("expected an exception on " + create);
} catch (InvocationTargetException e) {
assertTrue(e.getCause() instanceof ElasticsearchIllegalStateException);
assertTrue(e.getCause() instanceof CircuitBreakingException);
}
assertEquals(0, bigArrays.sizeInBytes());
assertEquals(0, hcbs.getBreaker(CircuitBreaker.Name.REQUEST).getUsed());
}
}
public void testMaxSizeExceededOnResize() throws Exception {
for (String type : Arrays.asList("Byte", "Int", "Long", "Float", "Double", "Object")) {
final long maxSize = randomIntBetween(1 << 10, 1 << 22);
BigArrays bigArrays = new BigArrays(ImmutableSettings.builder().put(BigArrays.MAX_SIZE_IN_BYTES_SETTING, maxSize).build(), null);
HierarchyCircuitBreakerService hcbs = new HierarchyCircuitBreakerService(
ImmutableSettings.builder()
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, maxSize)
.build(),
new NodeSettingsService(ImmutableSettings.EMPTY));
BigArrays bigArrays = new BigArrays(ImmutableSettings.EMPTY, null, hcbs).withCircuitBreaking();
Method create = BigArrays.class.getMethod("new" + type + "Array", long.class);
final int size = scaledRandomIntBetween(1, 20);
BigArray array = (BigArray) create.invoke(bigArrays, size);
Method resize = BigArrays.class.getMethod("resize", array.getClass().getInterfaces()[0], long.class);
while (true) {
long newSize = array.size() * 2;
assertEquals(array.ramBytesUsed(), bigArrays.sizeInBytes());
try {
array = (BigArray) resize.invoke(bigArrays, array, newSize);
} catch (InvocationTargetException e) {
assertTrue(e.getCause() instanceof ElasticsearchIllegalStateException);
assertTrue(e.getCause() instanceof CircuitBreakingException);
break;
}
}
assertEquals(array.ramBytesUsed(), bigArrays.sizeInBytes());
assertEquals(array.ramBytesUsed(), hcbs.getBreaker(CircuitBreaker.Name.REQUEST).getUsed());
array.close();
assertEquals(0, bigArrays.sizeInBytes());
assertEquals(0, hcbs.getBreaker(CircuitBreaker.Name.REQUEST).getUsed());
}
}

View File

@ -42,8 +42,8 @@ import org.elasticsearch.index.engine.IndexEngineModule;
import org.elasticsearch.index.query.functionscore.FunctionScoreModule;
import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.index.similarity.SimilarityModule;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.test.ElasticsearchTestCase;

View File

@ -39,10 +39,8 @@ import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.query.functionscore.FunctionScoreModule;
import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.index.similarity.SimilarityModule;
import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.test.ElasticsearchTestCase;
@ -101,4 +99,4 @@ public class IndexQueryParserPlugin2Tests extends ElasticsearchTestCase {
injector.getInstance(ThreadPool.class).shutdownNow();
}
}
}

View File

@ -39,10 +39,8 @@ import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.query.functionscore.FunctionScoreModule;
import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.index.similarity.SimilarityModule;
import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.test.ElasticsearchTestCase;
@ -110,4 +108,4 @@ public class IndexQueryParserPluginTests extends ElasticsearchTestCase {
injector.getInstance(ThreadPool.class).shutdownNow();
}
}
}

View File

@ -1,169 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.fielddata.breaker;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
import java.util.Arrays;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
/**
* Integration tests for InternalCircuitBreakerService
*/
@ClusterScope(scope = TEST, randomDynamicTemplates = false)
public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest {
private String randomRidiculouslySmallLimit() {
// 3 different ways to say 100 bytes
return randomFrom(Arrays.asList("100b", "100"));
//, (10000. / JvmInfo.jvmInfo().getMem().getHeapMax().bytes()) + "%")); // this is prone to rounding errors and will fail if JVM memory changes!
}
@Test
@TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE")
public void testMemoryBreaker() {
assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1))));
final Client client = client();
try {
// index some different terms so we have some field data for loading
int docCount = scaledRandomIntBetween(300, 1000);
for (long id = 0; id < docCount; id++) {
client.prepareIndex("cb-test", "type", Long.toString(id))
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
}
// refresh
refresh();
// execute a search that loads field data (sorting on the "test" field)
client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}")
.execute().actionGet();
// clear field data cache (thus setting the loaded field data back to 0)
client.admin().indices().prepareClearCache("cb-test").setFieldDataCache(true).execute().actionGet();
// Update circuit breaker settings
Settings settings = settingsBuilder()
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, randomRidiculouslySmallLimit())
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05)
.build();
client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet();
// execute a search that loads field data (sorting on the "test" field)
// again, this time it should trip the breaker
assertFailures(client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"),
RestStatus.INTERNAL_SERVER_ERROR,
containsString("Data too large, data for field [test] would be larger than limit of [100/100b]"));
NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get();
int breaks = 0;
for (NodeStats stat : stats.getNodes()) {
FieldDataBreakerStats breakerStats = stat.getBreaker();
breaks += breakerStats.getTrippedCount();
}
assertThat(breaks, greaterThanOrEqualTo(1));
} finally {
// Reset settings
Settings resetSettings = settingsBuilder()
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "-1")
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, InternalCircuitBreakerService.DEFAULT_OVERHEAD_CONSTANT)
.build();
client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
}
}
@Test
@TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE")
public void testRamAccountingTermsEnum() {
final Client client = client();
try {
// Create an index where the mappings have a field data filter
assertAcked(prepareCreate("ramtest").setSource("{\"mappings\": {\"type\": {\"properties\": {\"test\": " +
"{\"type\": \"string\",\"fielddata\": {\"filter\": {\"regex\": {\"pattern\": \"^value.*\"}}}}}}}}"));
// Wait 10 seconds for green
client.admin().cluster().prepareHealth("ramtest").setWaitForGreenStatus().setTimeout("10s").execute().actionGet();
// index some different terms so we have some field data for loading
int docCount = scaledRandomIntBetween(300, 1000);
for (long id = 0; id < docCount; id++) {
client.prepareIndex("ramtest", "type", Long.toString(id))
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
}
// refresh
refresh();
// execute a search that loads field data (sorting on the "test" field)
client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}")
.execute().actionGet();
// clear field data cache (thus setting the loaded field data back to 0)
client.admin().indices().prepareClearCache("ramtest").setFieldDataCache(true).execute().actionGet();
// Update circuit breaker settings
Settings settings = settingsBuilder()
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, randomRidiculouslySmallLimit())
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05)
.build();
client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet();
// execute a search that loads field data (sorting on the "test" field)
// again, this time it should trip the breaker
assertFailures(client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"),
RestStatus.INTERNAL_SERVER_ERROR,
containsString("Data too large, data for field [test] would be larger than limit of [100/100b]"));
NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get();
int breaks = 0;
for (NodeStats stat : stats.getNodes()) {
FieldDataBreakerStats breakerStats = stat.getBreaker();
breaks += breakerStats.getTrippedCount();
}
assertThat(breaks, greaterThanOrEqualTo(1));
} finally {
// Reset settings
Settings resetSettings = settingsBuilder()
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "-1")
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, InternalCircuitBreakerService.DEFAULT_OVERHEAD_CONSTANT)
.build();
client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
}
}
}

View File

@ -0,0 +1,288 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.memory.breaker;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.breaker.CircuitBreakerStats;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
import java.util.Arrays;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.cardinality;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.*;
/**
* Integration tests for InternalCircuitBreakerService
*/
@ClusterScope(scope = TEST, randomDynamicTemplates = false)
public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest {
private String randomRidiculouslySmallLimit() {
// 3 different ways to say 100 bytes
return randomFrom(Arrays.asList("100b", "100"));
//, (10000. / JvmInfo.jvmInfo().getMem().getHeapMax().bytes()) + "%")); // this is prone to rounding errors and will fail if JVM memory changes!
}
@Test
@TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE")
public void testMemoryBreaker() {
assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1))));
final Client client = client();
try {
// index some different terms so we have some field data for loading
int docCount = scaledRandomIntBetween(300, 1000);
for (long id = 0; id < docCount; id++) {
client.prepareIndex("cb-test", "type", Long.toString(id))
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
}
// refresh
refresh();
// execute a search that loads field data (sorting on the "test" field)
client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}")
.execute().actionGet();
// clear field data cache (thus setting the loaded field data back to 0)
client.admin().indices().prepareClearCache("cb-test").setFieldDataCache(true).execute().actionGet();
// Update circuit breaker settings
Settings settings = settingsBuilder()
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, randomRidiculouslySmallLimit())
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05)
.build();
client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet();
// execute a search that loads field data (sorting on the "test" field)
// again, this time it should trip the breaker
assertFailures(client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"),
RestStatus.INTERNAL_SERVER_ERROR,
containsString("Data too large, data for [test] would be larger than limit of [100/100b]"));
NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get();
int breaks = 0;
for (NodeStats stat : stats.getNodes()) {
CircuitBreakerStats breakerStats = stat.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA);
breaks += breakerStats.getTrippedCount();
}
assertThat(breaks, greaterThanOrEqualTo(1));
} finally {
// Reset settings
Settings resetSettings = settingsBuilder()
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "-1")
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING,
HierarchyCircuitBreakerService.DEFAULT_FIELDDATA_OVERHEAD_CONSTANT)
.build();
client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
}
}
@Test
@TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE")
public void testRamAccountingTermsEnum() {
final Client client = client();
try {
// Create an index where the mappings have a field data filter
assertAcked(prepareCreate("ramtest").setSource("{\"mappings\": {\"type\": {\"properties\": {\"test\": " +
"{\"type\": \"string\",\"fielddata\": {\"filter\": {\"regex\": {\"pattern\": \"^value.*\"}}}}}}}}"));
// Wait 10 seconds for green
client.admin().cluster().prepareHealth("ramtest").setWaitForGreenStatus().setTimeout("10s").execute().actionGet();
// index some different terms so we have some field data for loading
int docCount = scaledRandomIntBetween(300, 1000);
for (long id = 0; id < docCount; id++) {
client.prepareIndex("ramtest", "type", Long.toString(id))
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
}
// refresh
refresh();
// execute a search that loads field data (sorting on the "test" field)
client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}")
.execute().actionGet();
// clear field data cache (thus setting the loaded field data back to 0)
client.admin().indices().prepareClearCache("ramtest").setFieldDataCache(true).execute().actionGet();
// Update circuit breaker settings
Settings settings = settingsBuilder()
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, randomRidiculouslySmallLimit())
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05)
.build();
client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet();
// execute a search that loads field data (sorting on the "test" field)
// again, this time it should trip the breaker
assertFailures(client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"),
RestStatus.INTERNAL_SERVER_ERROR,
containsString("Data too large, data for [test] would be larger than limit of [100/100b]"));
NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().setBreaker(true).get();
int breaks = 0;
for (NodeStats stat : stats.getNodes()) {
CircuitBreakerStats breakerStats = stat.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA);
breaks += breakerStats.getTrippedCount();
}
assertThat(breaks, greaterThanOrEqualTo(1));
} finally {
// Reset settings
Settings resetSettings = settingsBuilder()
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "-1")
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING,
HierarchyCircuitBreakerService.DEFAULT_FIELDDATA_OVERHEAD_CONSTANT)
.build();
client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
}
}
/**
* Test that a breaker correctly redistributes to a different breaker, in
* this case, the fielddata breaker borrows space from the request breaker
*/
@Test
@TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE")
public void testParentChecking() {
assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1))));
Client client = client();
try {
// index some different terms so we have some field data for loading
int docCount = scaledRandomIntBetween(300, 1000);
for (long id = 0; id < docCount; id++) {
client.prepareIndex("cb-test", "type", Long.toString(id))
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
}
refresh();
// We need the request limit beforehand, just from a single node because the limit should always be the same
long beforeReqLimit = client.admin().cluster().prepareNodesStats().setBreaker(true).get()
.getNodes()[0].getBreaker().getStats(CircuitBreaker.Name.REQUEST).getLimit();
Settings resetSettings = settingsBuilder()
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "10b")
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0)
.build();
client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
// Perform a search to load field data for the "test" field
try {
client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get();
} catch (Exception e) {
String errMsg = "[FIELDDATA] Data too large, data for [test] would be larger than limit of [10/10b]";
assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException",
ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true));
}
assertFailures(client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}"),
RestStatus.INTERNAL_SERVER_ERROR,
containsString("Data too large, data for [test] would be larger than limit of [10/10b]"));
// Adjust settings so the parent breaker will fail, but the fielddata breaker doesn't
resetSettings = settingsBuilder()
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, "15b")
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "90%")
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.0)
.build();
client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
// Perform a search to load field data for the "test" field
try {
client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get();
} catch (Exception e) {
String errMsg = "[PARENT] Data too large, data for [test] would be larger than limit of [15/15b]";
assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException",
ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true));
}
} finally {
Settings resetSettings = settingsBuilder()
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "-1")
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, HierarchyCircuitBreakerService.DEFAULT_REQUEST_BREAKER_LIMIT)
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING,
HierarchyCircuitBreakerService.DEFAULT_FIELDDATA_OVERHEAD_CONSTANT)
.build();
client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
}
}
@Test
@TestLogging("org.elasticsearch.indices.memory.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE")
public void testRequestBreaker() {
assertAcked(prepareCreate("cb-test", 1, settingsBuilder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1))));
Client client = client();
try {
// Make request breaker limited to a small amount
Settings resetSettings = settingsBuilder()
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, "10b")
.build();
client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
// index some different terms so we have some field data for loading
int docCount = scaledRandomIntBetween(300, 1000);
for (long id = 0; id < docCount; id++) {
client.prepareIndex("cb-test", "type", Long.toString(id))
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", id).map()).execute().actionGet();
}
refresh();
// A cardinality aggregation uses BigArrays and thus the REQUEST breaker
try {
client.prepareSearch("cb-test").setQuery(matchAllQuery()).addAggregation(cardinality("card").field("test")).get();
fail("aggregation should have tripped the breaker");
} catch (Exception e) {
String errMsg = "CircuitBreakingException[[REQUEST] Data too large, data for [<reused_arrays>] would be larger than limit of [10/10b]]";
assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException",
ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true));
}
} finally {
Settings resetSettings = settingsBuilder()
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING,
HierarchyCircuitBreakerService.DEFAULT_REQUEST_BREAKER_LIMIT)
.build();
client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
}
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.memory.breaker;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.indices.breaker.BreakerSettings;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import static org.hamcrest.Matchers.equalTo;
/**
* Unit tests for the circuit breaker
*/
public class CircuitBreakerUnitTests extends ElasticsearchTestCase {
public static long pctBytes(String percentString) {
return ImmutableSettings.EMPTY.getAsMemory("", percentString).bytes();
}
@Test
public void testBreakerSettingsValidationWithValidSettings() {
// parent: {:limit 70}, fd: {:limit 50}, request: {:limit 20}
BreakerSettings fd = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, pctBytes("50%"), 1.0);
BreakerSettings request = new BreakerSettings(CircuitBreaker.Name.REQUEST, pctBytes("20%"), 1.0);
HierarchyCircuitBreakerService.validateSettings(new BreakerSettings[]{fd, request});
// parent: {:limit 70}, fd: {:limit 40}, request: {:limit 30}
fd = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, pctBytes("40%"), 1.0);
request = new BreakerSettings(CircuitBreaker.Name.REQUEST, pctBytes("30%"), 1.0);
HierarchyCircuitBreakerService.validateSettings(new BreakerSettings[]{fd, request});
}
@Test
public void testBreakerSettingsValidationNegativeOverhead() {
// parent: {:limit 70}, fd: {:limit 50}, request: {:limit 20}
BreakerSettings fd = new BreakerSettings(CircuitBreaker.Name.FIELDDATA, pctBytes("50%"), -0.1);
BreakerSettings request = new BreakerSettings(CircuitBreaker.Name.REQUEST, pctBytes("20%"), 1.0);
try {
HierarchyCircuitBreakerService.validateSettings(new BreakerSettings[]{fd, request});
fail("settings are invalid but validate settings did not throw an exception");
} catch (Exception e) {
assertThat("Incorrect message: " + e.getMessage(),
e.getMessage().contains("must be non-negative"), equalTo(true));
}
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.indices.fielddata.breaker;
package org.elasticsearch.indices.memory.breaker;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.DirectoryReader;
@ -29,6 +29,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -58,7 +59,7 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
public void testBreakerWithRandomExceptions() throws IOException, InterruptedException, ExecutionException {
for (NodeStats node : client().admin().cluster().prepareNodesStats()
.clear().setBreaker(true).execute().actionGet().getNodes()) {
assertThat("Breaker is not set to 0", node.getBreaker().getEstimated(), equalTo(0L));
assertThat("Breaker is not set to 0", node.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L));
}
String mapping = XContentFactory.jsonBuilder()
@ -144,7 +145,7 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
NodesStatsResponse resp = client().admin().cluster().prepareNodesStats()
.clear().setBreaker(true).execute().actionGet();
for (NodeStats stats : resp.getNodes()) {
assertThat("Breaker is set to 0", stats.getBreaker().getEstimated(), equalTo(0L));
assertThat("Breaker is set to 0", stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L));
}
for (int i = 0; i < numSearches; i++) {
@ -180,7 +181,8 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats()
.clear().setBreaker(true).execute().actionGet();
for (NodeStats stats : nodeStats.getNodes()) {
assertThat("Breaker reset to 0 last search success: " + success + " mapping: " + mapping, stats.getBreaker().getEstimated(), equalTo(0L));
assertThat("Breaker reset to 0 last search success: " + success + " mapping: " + mapping,
stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L));
}
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.indices.IndexMissingException;
@ -194,8 +195,10 @@ public abstract class TestCluster implements Iterable<Client>, Closeable {
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats()
.clear().setBreaker(true).execute().actionGet();
for (NodeStats stats : nodeStats.getNodes()) {
assertThat("Breaker not reset to 0 on node: " + stats.getNode(),
stats.getBreaker().getEstimated(), equalTo(0L));
assertThat("Fielddata breaker not reset to 0 on node: " + stats.getNode(),
stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L));
assertThat("Request breaker not reset to 0 on node: " + stats.getNode(),
stats.getBreaker().getStats(CircuitBreaker.Name.REQUEST).getEstimated(), equalTo(0L));
}
}
}

View File

@ -81,7 +81,7 @@ public class TestSearchContext extends SearchContext {
public TestSearchContext(ThreadPool threadPool, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, IndexService indexService, FilterCache filterCache, IndexFieldDataService indexFieldDataService) {
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
this.bigArrays = bigArrays;
this.bigArrays = bigArrays.withCircuitBreaking();
this.indexService = indexService;
this.filterCache = filterCache;
this.indexFieldDataService = indexFieldDataService;

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.*;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.test.ElasticsearchTestCase;
import java.util.*;
@ -68,43 +69,23 @@ public class MockBigArrays extends BigArrays {
}
}
}
// arrays that are not fully released.
ArrayList<BigArrays> badBigArrays = new ArrayList<>();
synchronized (INSTANCES) {
for (final BigArrays bigArrays : INSTANCES) {
// BigArrays are used on the network layer and the cluster is shared across tests so nodes might still be talking to
// each other a bit after the test finished, wait a bit for things to stabilize if so
final boolean sizeIsZero = ElasticsearchTestCase.awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return bigArrays.sizeInBytes() == 0;
}
});
if (!sizeIsZero) {
final long sizeInBytes = bigArrays.sizeInBytes();
if (sizeInBytes != 0) {
badBigArrays.add(bigArrays);
}
}
}
}
if (!badBigArrays.isEmpty()) {
INSTANCES.removeAll(badBigArrays);
StringBuilder msg = new StringBuilder("Found [").append(badBigArrays.size()).append("] big arrays which were not fully released. Here is a list of the first 20:");
for (int i = 0; i < Math.min(20, badBigArrays.size()); i++) {
msg.append("\nbigArray instance with [").append(badBigArrays.get(i).sizeInBytes()).append("] bytes");
}
throw new AssertionError(msg.toString());
}
}
private final Random random;
private final Settings settings;
private final PageCacheRecycler recycler;
private final CircuitBreakerService breakerService;
@Inject
public MockBigArrays(Settings settings, PageCacheRecycler recycler) {
super(settings, recycler);
public MockBigArrays(Settings settings, PageCacheRecycler recycler, CircuitBreakerService breakerService) {
this(settings, recycler, breakerService, false);
}
public MockBigArrays(Settings settings, PageCacheRecycler recycler, CircuitBreakerService breakerService, boolean checkBreaker) {
super(settings, recycler, breakerService, checkBreaker);
this.settings = settings;
this.recycler = recycler;
this.breakerService = breakerService;
long seed;
try {
seed = SeedUtils.parseSeed(RandomizedContext.current().getRunnerSeedAsString());
@ -115,6 +96,12 @@ public class MockBigArrays extends BigArrays {
INSTANCES.add(this);
}
@Override
public BigArrays withCircuitBreaking() {
return new MockBigArrays(this.settings, this.recycler, this.breakerService, true);
}
@Override
public ByteArray newByteArray(long size, boolean clearOnResize) {
final ByteArrayWrapper array = new ByteArrayWrapper(super.newByteArray(size, clearOnResize), clearOnResize);