From a7542247516d594f612044f33b95db39d27c5393 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Thu, 2 Jan 2014 15:04:47 -0700 Subject: [PATCH] Add field data memory circuit breaker. This adds the field data circuit breaker, which is used to estimate the amount of memory required to load field data before loading it. It then raises a CircuitBreakingException if the limit is exceeded. It is configured with two parameters: `indices.fielddata.cache.breaker.limit` - the maximum number of bytes of field data to be loaded before circuit breaking. Defaults to `indices.fielddata.cache.size` if set, unbounded otherwise. `indices.fielddata.cache.breaker.overhead` - a contast for all field data estimations to be multiplied with before aggregation. Defaults to 1.03. Both settings can be configured dynamically using the cluster update settings API. --- docs/reference/cluster/nodes-stats.asciidoc | 5 +- .../index-modules/fielddata.asciidoc | 27 +- .../admin/cluster/node/stats/NodeStats.java | 20 +- .../cluster/node/stats/NodesStatsRequest.java | 17 ++ .../node/stats/TransportNodesStatsAction.java | 3 +- .../stats/TransportClusterStatsAction.java | 2 +- .../ClusterDynamicSettingsModule.java | 3 + .../breaker/CircuitBreakingException.java | 33 +++ .../common/breaker/MemoryCircuitBreaker.java | 180 ++++++++++++++ .../fielddata/AbstractIndexFieldData.java | 42 ++++ .../index/fielddata/IndexFieldData.java | 4 +- .../fielddata/IndexFieldDataService.java | 12 +- .../fielddata/RamAccountingTermsEnum.java | 99 ++++++++ .../index/fielddata/ShardFieldData.java | 12 +- .../plain/DisabledIndexFieldData.java | 5 +- .../plain/DocValuesIndexFieldData.java | 5 +- .../plain/DoubleArrayIndexFieldData.java | 40 ++- .../plain/FSTBytesIndexFieldData.java | 27 +- .../plain/FloatArrayIndexFieldData.java | 40 ++- .../plain/GeoPointBinaryDVIndexFieldData.java | 5 +- .../GeoPointCompressedIndexFieldData.java | 31 ++- .../GeoPointDoubleArrayIndexFieldData.java | 33 ++- .../plain/NonEstimatingEstimator.java | 58 +++++ .../plain/PackedArrayIndexFieldData.java | 128 ++++++++-- .../plain/PagedBytesIndexFieldData.java | 182 +++++++++++++- .../elasticsearch/indices/IndicesModule.java | 4 + .../breaker/CircuitBreakerService.java | 40 +++ .../breaker/FieldDataBreakerStats.java | 102 ++++++++ .../InternalCircuitBreakerService.java | 123 +++++++++ .../cache/IndicesFieldDataCache.java | 21 +- .../node/service/NodeService.java | 15 +- .../node/stats/RestNodesStatsAction.java | 16 ++ .../elasticsearch/search/SearchModule.java | 2 + .../fielddata/LongFieldDataBenchmark.java | 3 +- .../breaker/MemoryCircuitBreakerTests.java | 108 ++++++++ .../aliases/IndexAliasesServiceTests.java | 3 + .../elasticsearch/index/codec/CodecTests.java | 9 + .../fielddata/AbstractFieldDataTests.java | 3 +- .../fielddata/IndexFieldDataServiceTests.java | 7 +- .../index/mapper/MapperTestUtils.java | 3 +- .../query/SimpleIndexQueryParserTests.java | 3 + .../guice/IndexQueryParserModuleTests.java | 3 + .../plugin/IndexQueryParserPlugin2Tests.java | 3 + .../plugin/IndexQueryParserPluginTests.java | 4 +- .../search/FieldDataTermsFilterTests.java | 3 +- .../index/similarity/SimilarityTests.java | 10 +- .../breaker/CircuitBreakerServiceTests.java | 148 +++++++++++ .../breaker/DummyCircuitBreakerService.java | 47 ++++ .../RandomExceptionCircuitBreakerTests.java | 235 ++++++++++++++++++ .../search/geo/GeoDistanceTests.java | 14 +- .../test/ElasticsearchIntegrationTest.java | 16 ++ 51 files changed, 1835 insertions(+), 123 deletions(-) create mode 100644 src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java create mode 100644 src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java create mode 100644 src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java create mode 100644 src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java create mode 100644 src/main/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerService.java create mode 100644 src/main/java/org/elasticsearch/indices/fielddata/breaker/FieldDataBreakerStats.java create mode 100644 src/main/java/org/elasticsearch/indices/fielddata/breaker/InternalCircuitBreakerService.java create mode 100644 src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java create mode 100644 src/test/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerServiceTests.java create mode 100644 src/test/java/org/elasticsearch/indices/fielddata/breaker/DummyCircuitBreakerService.java create mode 100644 src/test/java/org/elasticsearch/indices/fielddata/breaker/RandomExceptionCircuitBreakerTests.java diff --git a/docs/reference/cluster/nodes-stats.asciidoc b/docs/reference/cluster/nodes-stats.asciidoc index 75f8eea4515..b819e8c03e8 100644 --- a/docs/reference/cluster/nodes-stats.asciidoc +++ b/docs/reference/cluster/nodes-stats.asciidoc @@ -23,7 +23,7 @@ second command selectively retrieves nodes stats of only `nodeId1` and <>. By default, `indices` stats are returned. With options for `indices`, -`os`, `process`, `jvm`, `network`, `transport`, `http`, `fs`, and +`os`, `process`, `jvm`, `network`, `transport`, `http`, `fs`, `breaker`, and `thread_pool`. For example: [horizontal] @@ -60,6 +60,9 @@ By default, `indices` stats are returned. With options for `indices`, Transport statistics about sent and received bytes in cluster communication +`breaker`:: + Statistics about the field data circuit breaker + `clear`:: Clears all the flags (first). Useful, if you only want to retrieve specific stats. diff --git a/docs/reference/index-modules/fielddata.asciidoc b/docs/reference/index-modules/fielddata.asciidoc index 2ac2b4f6340..156c13fbe20 100644 --- a/docs/reference/index-modules/fielddata.asciidoc +++ b/docs/reference/index-modules/fielddata.asciidoc @@ -237,9 +237,34 @@ The `frequency` and `regex` filters can be combined: } -------------------------------------------------- +[float] +[[field-data-circuit-breaker]] +=== Field data circuit breaker +The field data circuit breaker allows Elasticsearch to estimate the amount of +memory a field will required to be loaded into memory. It can then prevent the +field data loading by raising and exception. By default it is configured with +no limit (-1 bytes), but is automatically set to `indices.fielddata.cache.size` +if set. It can be configured with the following parameters: + +[cols="<,<",options="header",] +|======================================================================= +|Setting |Description +|`indices.fielddata.breaker.limit` |Maximum size of estimated field data +to allow loading. Defaults to `indices.fielddata.cache.size` if set, unbounded +if not. +|`indices.fielddata.breaker.overhead` |A constant that all field data +estimations are multiplied with to determine a final estimation. Defaults to +1.03 +|======================================================================= + +Both the `indices.fielddata.breaker.limit` and +`indices.fielddata.breaker.overhead` can be changed dynamically using the +cluster update settings API. + [float] [[field-data-monitoring]] === Monitoring field data -You can monitor memory usage for field data using +You can monitor memory usage for field data as well as the field data circuit +breaker using <> diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java index 149fdc5fda8..5d8004472c0 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.http.HttpStats; import org.elasticsearch.indices.NodeIndicesStats; +import org.elasticsearch.indices.fielddata.breaker.FieldDataBreakerStats; import org.elasticsearch.monitor.fs.FsStats; import org.elasticsearch.monitor.jvm.JvmStats; import org.elasticsearch.monitor.network.NetworkStats; @@ -76,12 +77,16 @@ public class NodeStats extends NodeOperationResponse implements ToXContent { @Nullable private HttpStats http; + @Nullable + private FieldDataBreakerStats breaker; + NodeStats() { } public NodeStats(DiscoveryNode node, long timestamp, @Nullable String hostname, @Nullable NodeIndicesStats indices, - @Nullable OsStats os, @Nullable ProcessStats process, @Nullable JvmStats jvm, @Nullable ThreadPoolStats threadPool, @Nullable NetworkStats network, - @Nullable FsStats fs, @Nullable TransportStats transport, @Nullable HttpStats http) { + @Nullable OsStats os, @Nullable ProcessStats process, @Nullable JvmStats jvm, @Nullable ThreadPoolStats threadPool, + @Nullable NetworkStats network, @Nullable FsStats fs, @Nullable TransportStats transport, @Nullable HttpStats http, + @Nullable FieldDataBreakerStats breaker) { super(node); this.timestamp = timestamp; this.hostname = hostname; @@ -94,6 +99,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent { this.fs = fs; this.transport = transport; this.http = http; + this.breaker = breaker; } public long getTimestamp() { @@ -171,6 +177,11 @@ public class NodeStats extends NodeOperationResponse implements ToXContent { return this.http; } + @Nullable + public FieldDataBreakerStats getBreaker() { + return this.breaker; + } + public static NodeStats readNodeStats(StreamInput in) throws IOException { NodeStats nodeInfo = new NodeStats(); nodeInfo.readFrom(in); @@ -211,6 +222,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent { if (in.readBoolean()) { http = HttpStats.readHttpStats(in); } + breaker = FieldDataBreakerStats.readOptionalCircuitBreakerStats(in); } @Override @@ -277,6 +289,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent { out.writeBoolean(true); http.writeTo(out); } + out.writeOptionalStreamable(breaker); } @Override @@ -326,6 +339,9 @@ public class NodeStats extends NodeOperationResponse implements ToXContent { if (getHttp() != null) { getHttp().toXContent(builder, params); } + if (getBreaker() != null) { + getBreaker().toXContent(builder, params); + } return builder; } diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java index 8fc499e050f..f45a97196c6 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -40,6 +40,7 @@ public class NodesStatsRequest extends NodesOperationRequest private boolean fs; private boolean transport; private boolean http; + private boolean breaker; protected NodesStatsRequest() { } @@ -65,6 +66,7 @@ public class NodesStatsRequest extends NodesOperationRequest this.fs = true; this.transport = true; this.http = true; + this.breaker = true; return this; } @@ -81,6 +83,7 @@ public class NodesStatsRequest extends NodesOperationRequest this.fs = false; this.transport = false; this.http = false; + this.breaker = false; return this; } @@ -225,6 +228,18 @@ public class NodesStatsRequest extends NodesOperationRequest return this; } + public boolean breaker() { + return this.breaker; + } + + /** + * Should the node's circuit breaker stats be returned. + */ + public NodesStatsRequest breaker(boolean breaker) { + this.breaker = breaker; + return this; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -237,6 +252,7 @@ public class NodesStatsRequest extends NodesOperationRequest fs = in.readBoolean(); transport = in.readBoolean(); http = in.readBoolean(); + breaker = in.readBoolean(); } @Override @@ -251,6 +267,7 @@ public class NodesStatsRequest extends NodesOperationRequest out.writeBoolean(fs); out.writeBoolean(transport); out.writeBoolean(http); + out.writeBoolean(breaker); } } diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 488b67966a0..4c93df069d2 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -97,7 +97,8 @@ public class TransportNodesStatsAction extends TransportNodesOperationAction shardsStats = new ArrayList(); for (String index : indicesService.indices()) { IndexService indexService = indicesService.indexService(index); diff --git a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java index e23ef23c1d1..dfccf3c1ee5 100644 --- a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java @@ -29,6 +29,7 @@ import org.elasticsearch.indices.cache.filter.IndicesFilterCache; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.ttl.IndicesTTLService; +import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; /** @@ -75,6 +76,8 @@ public class ClusterDynamicSettingsModule extends AbstractModule { clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED); clusterDynamicSettings.addDynamicSetting(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, Validator.TIME); clusterDynamicSettings.addDynamicSetting(SnapshotInProgressAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED); + clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, Validator.BYTES_SIZE); + clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE); } public void addDynamicSettings(String... settings) { diff --git a/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java b/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java new file mode 100644 index 00000000000..08afbc275f5 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java @@ -0,0 +1,33 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.breaker; + +import org.elasticsearch.ElasticSearchException; + +/** + * Exception thrown when the circuit breaker trips + */ +public class CircuitBreakingException extends ElasticSearchException { + + // TODO: maybe add more neat metrics here? + public CircuitBreakingException(String message) { + super(message); + } +} diff --git a/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java b/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java new file mode 100644 index 00000000000..33cb61872ce --- /dev/null +++ b/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java @@ -0,0 +1,180 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.breaker; + +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.unit.ByteSizeValue; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * MemoryCircuitBreaker is a circuit breaker that breaks once a + * configurable memory limit has been reached. + */ +public class MemoryCircuitBreaker { + + private final long memoryBytesLimit; + private final double overheadConstant; + private final AtomicLong used; + private final ESLogger logger; + + + /** + * Create a circuit breaker that will break if the number of estimated + * bytes grows above the limit. All estimations will be multiplied by + * the given overheadConstant. This breaker starts with 0 bytes used. + * @param limit circuit breaker limit + * @param overheadConstant constant multiplier for byte estimations + */ + public MemoryCircuitBreaker(ByteSizeValue limit, double overheadConstant, ESLogger logger) { + this.memoryBytesLimit = limit.bytes(); + this.overheadConstant = overheadConstant; + this.used = new AtomicLong(0); + this.logger = logger; + if (logger.isTraceEnabled()) { + logger.trace("Creating MemoryCircuitBreaker with a limit of {} bytes ({}) and a overhead constant of {}", + this.memoryBytesLimit, limit, this.overheadConstant); + } + } + + /** + * Create a circuit breaker that will break if the number of estimated + * bytes grows above the limit. All estimations will be multiplied by + * the given overheadConstant. Uses the given oldBreaker to initialize + * the starting offset. + * @param limit circuit breaker limit + * @param overheadConstant constant multiplier for byte estimations + * @param oldBreaker the previous circuit breaker to inherit the used value from (starting offset) + */ + public MemoryCircuitBreaker(ByteSizeValue limit, double overheadConstant, MemoryCircuitBreaker oldBreaker, ESLogger logger) { + this.memoryBytesLimit = limit.bytes(); + this.overheadConstant = overheadConstant; + if (oldBreaker == null) { + this.used = new AtomicLong(0); + } else { + this.used = oldBreaker.used; + } + this.logger = logger; + if (logger.isTraceEnabled()) { + logger.trace("Creating MemoryCircuitBreaker with a limit of {} bytes ({}) and a overhead constant of {}", + this.memoryBytesLimit, limit, this.overheadConstant); + } + } + + /** + * Method used to trip the breaker + * @throws CircuitBreakingException + */ + public void circuitBreak() throws CircuitBreakingException { + throw new CircuitBreakingException("Data too large, data would be larger than limit of [" + + memoryBytesLimit + "] bytes"); + } + + /** + * Add a number of bytes, tripping the circuit breaker if the aggregated + * estimates are above the limit. Automatically trips the breaker if the + * memory limit is set to 0. Will never trip the breaker if the limit is + * set < 0, but can still be used to aggregate estimations. + * @param bytes number of bytes to add to the breaker + * @return number of "used" bytes so far + * @throws CircuitBreakingException + */ + public double addEstimateBytesAndMaybeBreak(long bytes) throws CircuitBreakingException { + // short-circuit on no data allowed, immediately throwing an exception + if (memoryBytesLimit == 0) { + circuitBreak(); + } + + long newUsed; + // If there is no limit (-1), we can optimize a bit by using + // .addAndGet() instead of looping (because we don't have to check a + // limit), which makes the RamAccountingTermsEnum case faster. + if (this.memoryBytesLimit == -1) { + newUsed = this.used.addAndGet(bytes); + if (logger.isTraceEnabled()) { + logger.trace("Adding [{}] to used bytes [new used: [{}], limit: [-1b]]", + new ByteSizeValue(bytes), new ByteSizeValue(newUsed)); + } + return newUsed; + } + + // Otherwise, check the addition and commit the addition, looping if + // there are conflicts. May result in additional logging, but it's + // trace logging and shouldn't be counted on for additions. + long currentUsed; + do { + currentUsed = this.used.get(); + newUsed = currentUsed + bytes; + long newUsedWithOverhead = (long)(newUsed * overheadConstant); + if (logger.isTraceEnabled()) { + logger.trace("Adding [{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]", + new ByteSizeValue(bytes), new ByteSizeValue(newUsed), + memoryBytesLimit, new ByteSizeValue(memoryBytesLimit), + newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead)); + } + if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) { + logger.error("New used memory {} [{}] would be larger than configured breaker: {} [{}], breaking", + newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), + memoryBytesLimit, new ByteSizeValue(memoryBytesLimit)); + circuitBreak(); + } + // Attempt to set the new used value, but make sure it hasn't changed + // underneath us, if it has, keep trying until we are able to set it + } while (!this.used.compareAndSet(currentUsed, newUsed)); + + return newUsed; + } + + /** + * Add an exact number of bytes, not checking for tripping the + * circuit breaker. This bypasses the overheadConstant multiplication. + * @param bytes number of bytes to add to the breaker + * @return number of "used" bytes so far + */ + public long addWithoutBreaking(long bytes) { + long u = used.addAndGet(bytes); + if (logger.isTraceEnabled()) { + logger.trace("Adjusted breaker by [{}] bytes, now [{}]", bytes, u); + } + assert u >= 0 : "Used bytes: [" + u + "] must be >= 0"; + return u; + } + + /** + * @return the number of aggregated "used" bytes so far + */ + public long getUsed() { + return this.used.get(); + } + + /** + * @return the maximum number of bytes before the circuit breaker will trip + */ + public long getMaximum() { + return this.memoryBytesLimit; + } + + /** + * @return the constant multiplier the breaker uses for aggregations + */ + public double getOverhead() { + return this.overheadConstant; + } +} diff --git a/src/main/java/org/elasticsearch/index/fielddata/AbstractIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/AbstractIndexFieldData.java index ce73dd0e0fc..d36b1b9d085 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/AbstractIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/AbstractIndexFieldData.java @@ -2,6 +2,9 @@ package org.elasticsearch.index.fielddata; import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.AbstractIndexComponent; @@ -9,6 +12,8 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.settings.IndexSettings; +import java.io.IOException; + /** */ public abstract class AbstractIndexFieldData extends AbstractIndexComponent implements IndexFieldData { @@ -53,4 +58,41 @@ public abstract class AbstractIndexFieldData extends } } + /** + * A {@code PerValueEstimator} is a sub-class that can be used to estimate + * the memory overhead for loading the data. Each field data + * implementation should implement its own {@code PerValueEstimator} if it + * intends to take advantage of the MemoryCircuitBreaker. + *

+ * Note that the .beforeLoad(...) and .afterLoad(...) methods must be + * manually called. + */ + public interface PerValueEstimator { + + /** + * @return the number of bytes for the given term + */ + public long bytesPerValue(BytesRef term); + + /** + * Execute any pre-loading estimations for the terms. May also + * optionally wrap a {@link TermsEnum} in a + * {@link RamAccountingTermsEnum} + * which will estimate the memory on a per-term basis. + * + * @param terms terms to be estimated + * @return A TermsEnum for the given terms + * @throws IOException + */ + public TermsEnum beforeLoad(Terms terms) throws IOException; + + /** + * Possibly adjust a circuit breaker after field data has been loaded, + * now that the actual amount of memory used by the field data is known + * + * @param termsEnum terms that were loaded + * @param actualUsed actual field data memory usage + */ + public void afterLoad(TermsEnum termsEnum, long actualUsed); + } } diff --git a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java index bbccb2a9887..a3050f6f4e9 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java @@ -32,6 +32,7 @@ import org.elasticsearch.index.IndexComponent; import org.elasticsearch.index.fielddata.fieldcomparator.SortMode; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; /** */ @@ -170,7 +171,8 @@ public interface IndexFieldData extends IndexCompone interface Builder { - IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache); + IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache, + CircuitBreakerService breakerService); } public interface WithOrdinals extends IndexFieldData { diff --git a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java index 091dc8d4efb..b0e36e1c822 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java +++ b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java @@ -36,6 +36,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -54,6 +55,7 @@ public class IndexFieldDataService extends AbstractIndexComponent { private final static ImmutableMap buildersByType; private final static ImmutableMap docValuesBuildersByType; private final static ImmutableMap, IndexFieldData.Builder> buildersByTypeAndFormat; + private final CircuitBreakerService circuitBreakerService; static { buildersByType = MapBuilder.newMapBuilder() @@ -123,14 +125,16 @@ public class IndexFieldDataService extends AbstractIndexComponent { IndexService indexService; // public for testing - public IndexFieldDataService(Index index) { - this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCache(ImmutableSettings.Builder.EMPTY_SETTINGS)); + public IndexFieldDataService(Index index, CircuitBreakerService circuitBreakerService) { + this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCache(ImmutableSettings.Builder.EMPTY_SETTINGS), circuitBreakerService); } @Inject - public IndexFieldDataService(Index index, @IndexSettings Settings indexSettings, IndicesFieldDataCache indicesFieldDataCache) { + public IndexFieldDataService(Index index, @IndexSettings Settings indexSettings, IndicesFieldDataCache indicesFieldDataCache, + CircuitBreakerService circuitBreakerService) { super(index, indexSettings); this.indicesFieldDataCache = indicesFieldDataCache; + this.circuitBreakerService = circuitBreakerService; } // we need to "inject" the index service to not create cyclic dep @@ -231,7 +235,7 @@ public class IndexFieldDataService extends AbstractIndexComponent { fieldDataCaches.put(fieldNames.indexName(), cache); } - fieldData = builder.build(index, indexSettings, mapper, cache); + fieldData = builder.build(index, indexSettings, mapper, cache, circuitBreakerService); loadedFieldData.put(fieldNames.indexName(), fieldData); } } diff --git a/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java b/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java new file mode 100644 index 00000000000..d734d55e4db --- /dev/null +++ b/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java @@ -0,0 +1,99 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.fielddata; + +import org.apache.lucene.index.FilteredTermsEnum; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.breaker.MemoryCircuitBreaker; +import org.elasticsearch.index.fielddata.AbstractIndexFieldData; + +import java.io.IOException; + +/** + * {@link TermsEnum} that takes a MemoryCircuitBreaker, increasing the breaker + * every time {@code .next(...)} is called. Proxies all methods to the original + * TermsEnum otherwise. + */ +public final class RamAccountingTermsEnum extends FilteredTermsEnum { + + // Flush every 1mb + private static final long FLUSH_BUFFER_SIZE = 1024 * 1024; + + private final MemoryCircuitBreaker breaker; + private final TermsEnum termsEnum; + private final AbstractIndexFieldData.PerValueEstimator estimator; + private long totalBytes; + private long flushBuffer; + + + public RamAccountingTermsEnum(TermsEnum termsEnum, MemoryCircuitBreaker breaker, AbstractIndexFieldData.PerValueEstimator estimator) { + super(termsEnum); + this.breaker = breaker; + this.termsEnum = termsEnum; + this.estimator = estimator; + this.totalBytes = 0; + this.flushBuffer = 0; + } + + /** + * Always accept the term. + */ + @Override + protected AcceptStatus accept(BytesRef term) throws IOException { + return AcceptStatus.YES; + } + + /** + * Flush the {@code flushBuffer} to the breaker, incrementing the total + * bytes and resetting the buffer. + */ + public void flush() { + breaker.addEstimateBytesAndMaybeBreak(this.flushBuffer); + this.totalBytes += this.flushBuffer; + this.flushBuffer = 0; + } + + /** + * Proxy to the original next() call, but estimates the overhead of + * loading the next term. + */ + @Override + public BytesRef next() throws IOException { + BytesRef term = termsEnum.next(); + if (term == null && this.flushBuffer != 0) { + // We have reached the end of the termsEnum, flush the buffer + flush(); + } else { + this.flushBuffer += estimator.bytesPerValue(term); + if (this.flushBuffer >= FLUSH_BUFFER_SIZE) { + flush(); + } + } + return term; + } + + /** + * @return the total number of bytes that have been aggregated + */ + public long getTotalBytes() { + return this.totalBytes; + } +} diff --git a/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java index 7958927bacd..cca61e465b0 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java @@ -26,10 +26,13 @@ import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.fielddata.plain.PackedArrayAtomicFieldData; +import org.elasticsearch.index.fielddata.plain.PagedBytesAtomicFieldData; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -43,9 +46,12 @@ public class ShardFieldData extends AbstractIndexShardComponent implements Index final ConcurrentMap perFieldTotals = ConcurrentCollections.newConcurrentMap(); + private final CircuitBreakerService breakerService; + @Inject - public ShardFieldData(ShardId shardId, @IndexSettings Settings indexSettings) { + public ShardFieldData(ShardId shardId, @IndexSettings Settings indexSettings, CircuitBreakerService breakerService) { super(shardId, indexSettings); + this.breakerService = breakerService; } public FieldDataStats stats(String... fields) { @@ -89,6 +95,10 @@ public class ShardFieldData extends AbstractIndexShardComponent implements Index evictionsMetric.inc(); } if (sizeInBytes != -1) { + // Since field data is being unloaded (due to expiration or manual + // clearing), we also need to decrement the used bytes in the breaker + breakerService.getBreaker().addWithoutBreaking(-sizeInBytes); + totalMetric.dec(sizeInBytes); String keyFieldName = fieldNames.indexName(); diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java index 9f324ad5670..68474b0c1bd 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.fielddata.fieldcomparator.SortMode; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.FieldMapper.Names; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; /** * A field data implementation that forbids loading and will throw an {@link ElasticSearchIllegalStateException} if you try to load @@ -37,7 +38,9 @@ public final class DisabledIndexFieldData extends AbstractIndexFieldData> build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { + public IndexFieldData> build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, + IndexFieldDataCache cache, CircuitBreakerService breakerService) { + // Ignore Circuit Breaker return new DisabledIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache); } } diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java index 410b65c5cbd..75f213ed597 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java @@ -32,6 +32,7 @@ import org.elasticsearch.index.mapper.FieldMapper.Names; import org.elasticsearch.index.mapper.internal.IdFieldMapper; import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; import java.util.Map; import java.util.Set; @@ -77,7 +78,9 @@ public abstract class DocValuesIndexFieldData { } @Override - public IndexFieldData build(Index index, Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { + public IndexFieldData build(Index index, Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache, + CircuitBreakerService breakerService) { + // Ignore Circuit Breaker final FieldMapper.Names fieldNames = mapper.names(); final Settings fdSettings = mapper.fieldDataType().getSettings(); final Map filter = fdSettings.getGroups("filter"); diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java index 5cb5312660a..5a21035aed0 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java @@ -35,21 +35,27 @@ import org.elasticsearch.index.fielddata.ordinals.Ordinals.Docs; import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; /** */ public class DoubleArrayIndexFieldData extends AbstractIndexFieldData implements IndexNumericFieldData { + private final CircuitBreakerService breakerService; + public static class Builder implements IndexFieldData.Builder { @Override - public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { - return new DoubleArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache); + public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache, + CircuitBreakerService breakerService) { + return new DoubleArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, breakerService); } } - public DoubleArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache) { + public DoubleArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, + FieldDataType fieldDataType, IndexFieldDataCache cache, CircuitBreakerService breakerService) { super(index, indexSettings, fieldNames, fieldDataType, cache); + this.breakerService = breakerService; } @Override @@ -69,8 +75,13 @@ public class DoubleArrayIndexFieldData extends AbstractIndexFieldData { + private final CircuitBreakerService breakerService; + public static class Builder implements IndexFieldData.Builder { @Override - public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { - return new FSTBytesIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache); + public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, + IndexFieldDataCache cache, CircuitBreakerService breakerService) { + return new FSTBytesIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, breakerService); } } - FSTBytesIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache) { + FSTBytesIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, + IndexFieldDataCache cache, CircuitBreakerService breakerService) { super(index, indexSettings, fieldNames, fieldDataType, cache); + this.breakerService = breakerService; } @Override @@ -57,8 +63,13 @@ public class FSTBytesIndexFieldData extends AbstractBytesIndexFieldData fstBuilder = new org.apache.lucene.util.fst.Builder(INPUT_TYPE.BYTE1, outputs); @@ -72,6 +83,7 @@ public class FSTBytesIndexFieldData extends AbstractBytesIndexFieldData implements IndexNumericFieldData { + private final CircuitBreakerService breakerService; + public static class Builder implements IndexFieldData.Builder { @Override - public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { - return new FloatArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache); + public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache, + CircuitBreakerService breakerService) { + return new FloatArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, breakerService); } } - public FloatArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache) { + public FloatArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, + FieldDataType fieldDataType, IndexFieldDataCache cache, CircuitBreakerService breakerService) { super(index, indexSettings, fieldNames, fieldDataType, cache); + this.breakerService = breakerService; } @Override @@ -68,8 +74,13 @@ public class FloatArrayIndexFieldData extends AbstractIndexFieldData build(Index index, Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { + public IndexFieldData build(Index index, Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache, + CircuitBreakerService breakerService) { + // Ignore breaker final FieldMapper.Names fieldNames = mapper.names(); return new GeoPointBinaryDVIndexFieldData(index, fieldNames); } diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java index 601c27c1276..4c5e3d6607f 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java @@ -38,6 +38,7 @@ import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.geo.GeoPointFieldMapper; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; /** */ @@ -45,11 +46,13 @@ public class GeoPointCompressedIndexFieldData extends AbstractGeoPointIndexField private static final String PRECISION_KEY = "precision"; private static final Distance DEFAULT_PRECISION_VALUE = new Distance(1, DistanceUnit.CENTIMETERS); + private final CircuitBreakerService breakerService; public static class Builder implements IndexFieldData.Builder { @Override - public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { + public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache, + CircuitBreakerService breakerService) { FieldDataType type = mapper.fieldDataType(); final String precisionAsString = type.getSettings().get(PRECISION_KEY); final Distance precision; @@ -58,15 +61,18 @@ public class GeoPointCompressedIndexFieldData extends AbstractGeoPointIndexField } else { precision = DEFAULT_PRECISION_VALUE; } - return new GeoPointCompressedIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, precision); + return new GeoPointCompressedIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, precision, breakerService); } } private final GeoPointFieldMapper.Encoding encoding; - public GeoPointCompressedIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache, Distance precision) { + public GeoPointCompressedIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, + FieldDataType fieldDataType, IndexFieldDataCache cache, Distance precision, + CircuitBreakerService breakerService) { super(index, indexSettings, fieldNames, fieldDataType, cache); this.encoding = GeoPointFieldMapper.Encoding.of(precision); + this.breakerService = breakerService; } @Override @@ -74,8 +80,13 @@ public class GeoPointCompressedIndexFieldData extends AbstractGeoPointIndexField AtomicReader reader = context.reader(); Terms terms = reader.terms(getFieldNames().indexName()); + AtomicGeoPointFieldData data = null; + // TODO: Use an actual estimator to estimate before loading. + NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker()); if (terms == null) { - return new Empty(reader.maxDoc()); + data = new Empty(reader.maxDoc()); + estimator.afterLoad(null, data.getMemorySizeInBytes()); + return data; } final long initialSize; if (terms.size() >= 0) { @@ -88,6 +99,7 @@ public class GeoPointCompressedIndexFieldData extends AbstractGeoPointIndexField PagedMutable lon = new PagedMutable(initialSize, pageSize, encoding.numBitsPerCoordinate(), PackedInts.COMPACT); final float acceptableTransientOverheadRatio = fieldDataType.getSettings().getAsFloat("acceptable_transient_overhead_ratio", OrdinalsBuilder.DEFAULT_ACCEPTABLE_OVERHEAD_RATIO); OrdinalsBuilder builder = new OrdinalsBuilder(terms.size(), reader.maxDoc(), acceptableTransientOverheadRatio); + boolean success = false; try { final GeoPointEnum iter = new GeoPointEnum(builder.buildFromTerms(terms.iterator(null))); GeoPoint point; @@ -116,18 +128,23 @@ public class GeoPointCompressedIndexFieldData extends AbstractGeoPointIndexField } FixedBitSet set = builder.buildDocsWithValuesSet(); if (set == null) { - return new GeoPointCompressedAtomicFieldData.Single(encoding, sLon, sLat, reader.maxDoc(), ordinals.getNumOrds()); + data = new GeoPointCompressedAtomicFieldData.Single(encoding, sLon, sLat, reader.maxDoc(), ordinals.getNumOrds()); } else { - return new GeoPointCompressedAtomicFieldData.SingleFixedSet(encoding, sLon, sLat, reader.maxDoc(), set, ordinals.getNumOrds()); + data = new GeoPointCompressedAtomicFieldData.SingleFixedSet(encoding, sLon, sLat, reader.maxDoc(), set, ordinals.getNumOrds()); } } else { if (lat.size() != build.getMaxOrd()) { lat = lat.resize(build.getMaxOrd()); lon = lon.resize(build.getMaxOrd()); } - return new GeoPointCompressedAtomicFieldData.WithOrdinals(encoding, lon, lat, reader.maxDoc(), build); + data = new GeoPointCompressedAtomicFieldData.WithOrdinals(encoding, lon, lat, reader.maxDoc(), build); } + success = true; + return data; } finally { + if (success) { + estimator.afterLoad(null, data.getMemorySizeInBytes()); + } builder.close(); } diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java index 4ac005f0580..d049f58cc55 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java @@ -33,21 +33,27 @@ import org.elasticsearch.index.fielddata.ordinals.Ordinals.Docs; import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; /** */ public class GeoPointDoubleArrayIndexFieldData extends AbstractGeoPointIndexFieldData { + private final CircuitBreakerService breakerService; + public static class Builder implements IndexFieldData.Builder { @Override - public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { - return new GeoPointDoubleArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache); + public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache, + CircuitBreakerService breakerService) { + return new GeoPointDoubleArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, breakerService); } } - public GeoPointDoubleArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache) { + public GeoPointDoubleArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, + FieldDataType fieldDataType, IndexFieldDataCache cache, CircuitBreakerService breakerService) { super(index, indexSettings, fieldNames, fieldDataType, cache); + this.breakerService = breakerService; } @Override @@ -55,8 +61,13 @@ public class GeoPointDoubleArrayIndexFieldData extends AbstractGeoPointIndexFiel AtomicReader reader = context.reader(); Terms terms = reader.terms(getFieldNames().indexName()); + AtomicGeoPointFieldData data = null; + // TODO: Use an actual estimator to estimate before loading. + NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker()); if (terms == null) { - return new Empty(reader.maxDoc()); + data = new Empty(reader.maxDoc()); + estimator.afterLoad(null, data.getMemorySizeInBytes()); + return data; } final BigDoubleArrayList lat = new BigDoubleArrayList(); final BigDoubleArrayList lon = new BigDoubleArrayList(); @@ -64,6 +75,7 @@ public class GeoPointDoubleArrayIndexFieldData extends AbstractGeoPointIndexFiel lon.add(0); // first "t" indicates null value final float acceptableTransientOverheadRatio = fieldDataType.getSettings().getAsFloat("acceptable_transient_overhead_ratio", OrdinalsBuilder.DEFAULT_ACCEPTABLE_OVERHEAD_RATIO); OrdinalsBuilder builder = new OrdinalsBuilder(terms.size(), reader.maxDoc(), acceptableTransientOverheadRatio); + boolean success = false; try { final GeoPointEnum iter = new GeoPointEnum(builder.buildFromTerms(terms.iterator(null))); GeoPoint point; @@ -85,16 +97,19 @@ public class GeoPointDoubleArrayIndexFieldData extends AbstractGeoPointIndexFiel } FixedBitSet set = builder.buildDocsWithValuesSet(); if (set == null) { - return new GeoPointDoubleArrayAtomicFieldData.Single(sLon, sLat, reader.maxDoc(), ordinals.getNumOrds()); + data = new GeoPointDoubleArrayAtomicFieldData.Single(sLon, sLat, reader.maxDoc(), ordinals.getNumOrds()); } else { - return new GeoPointDoubleArrayAtomicFieldData.SingleFixedSet(sLon, sLat, reader.maxDoc(), set, ordinals.getNumOrds()); + data = new GeoPointDoubleArrayAtomicFieldData.SingleFixedSet(sLon, sLat, reader.maxDoc(), set, ordinals.getNumOrds()); } } else { - return new GeoPointDoubleArrayAtomicFieldData.WithOrdinals( - lon, lat, - reader.maxDoc(), build); + data = new GeoPointDoubleArrayAtomicFieldData.WithOrdinals(lon, lat, reader.maxDoc(), build); } + success = true; + return data; } finally { + if (success) { + estimator.afterLoad(null, data.getMemorySizeInBytes()); + } builder.close(); } diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java b/src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java new file mode 100644 index 00000000000..5a5d8fb4582 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java @@ -0,0 +1,58 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.fielddata.plain; + +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.MemoryCircuitBreaker; +import org.elasticsearch.index.fielddata.AbstractIndexFieldData; + +import java.io.IOException; + +/** + * Estimator that does nothing except for adjust the breaker after the field + * data has been loaded. Useful for field data implementations that do not yet + * have pre-loading estimations. + */ +public class NonEstimatingEstimator implements AbstractIndexFieldData.PerValueEstimator { + + private final MemoryCircuitBreaker breaker; + + NonEstimatingEstimator(MemoryCircuitBreaker breaker) { + this.breaker = breaker; + } + + @Override + public long bytesPerValue(BytesRef term) { + return 0; + } + + @Override + public TermsEnum beforeLoad(Terms terms) throws IOException { + return null; + } + + @Override + public void afterLoad(@Nullable TermsEnum termsEnum, long actualUsed) { + breaker.addWithoutBreaking(actualUsed); + } +} diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java index 499cee8f5ad..caa5876a32d 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import org.apache.lucene.index.AtomicReader; import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; import org.apache.lucene.util.FixedBitSet; @@ -30,8 +31,10 @@ import org.apache.lucene.util.NumericUtils; import org.apache.lucene.util.packed.MonotonicAppendingLongBuffer; import org.apache.lucene.util.packed.PackedInts; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.MemoryCircuitBreaker; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; +import org.elasticsearch.index.fielddata.RamAccountingTermsEnum; import org.elasticsearch.index.fielddata.*; import org.elasticsearch.index.fielddata.fieldcomparator.LongValuesComparatorSource; import org.elasticsearch.index.fielddata.fieldcomparator.SortMode; @@ -40,7 +43,9 @@ import org.elasticsearch.index.fielddata.ordinals.Ordinals.Docs; import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import java.io.IOException; import java.util.EnumSet; /** @@ -58,18 +63,23 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { - return new PackedArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, numericType); + public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, + IndexFieldDataCache cache, CircuitBreakerService breakerService) { + return new PackedArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, numericType, breakerService); } } private final NumericType numericType; + private final CircuitBreakerService breakerService; - public PackedArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache, NumericType numericType) { + public PackedArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, + FieldDataType fieldDataType, IndexFieldDataCache cache, NumericType numericType, + CircuitBreakerService breakerService) { super(index, indexSettings, fieldNames, fieldDataType, cache); Preconditions.checkNotNull(numericType); Preconditions.checkArgument(EnumSet.of(NumericType.BYTE, NumericType.SHORT, NumericType.INT, NumericType.LONG).contains(numericType), getClass().getSimpleName() + " only supports integer types, not " + numericType); this.numericType = numericType; + this.breakerService = breakerService; } @Override @@ -88,8 +98,12 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData 32; @@ -156,28 +172,38 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData { + private final CircuitBreakerService breakerService; + public static class Builder implements IndexFieldData.Builder { @Override - public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { - return new PagedBytesIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache); + public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, + IndexFieldDataCache cache, CircuitBreakerService breakerService) { + return new PagedBytesIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, breakerService); } } - public PagedBytesIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache) { + public PagedBytesIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, + FieldDataType fieldDataType, IndexFieldDataCache cache, CircuitBreakerService breakerService) { super(index, indexSettings, fieldNames, fieldDataType, cache); + this.breakerService = breakerService; } @Override public PagedBytesAtomicFieldData loadDirect(AtomicReaderContext context) throws Exception { AtomicReader reader = context.reader(); + PagedBytesEstimator estimator = new PagedBytesEstimator(context, breakerService.getBreaker()); Terms terms = reader.terms(getFieldNames().indexName()); if (terms == null) { - return PagedBytesAtomicFieldData.empty(reader.maxDoc()); + PagedBytesAtomicFieldData emptyData = PagedBytesAtomicFieldData.empty(reader.maxDoc()); + estimator.adjustForNoTerms(emptyData.getMemorySizeInBytes()); + return emptyData; } final PagedBytes bytes = new PagedBytes(15); @@ -68,12 +80,22 @@ public class PagedBytesIndexFieldData extends AbstractBytesIndexFieldData implements CircuitBreakerService { + + public static final String CIRCUIT_BREAKER_MAX_BYTES_SETTING = "indices.fielddata.breaker.limit"; + public static final String CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.fielddata.breaker.overhead"; + + public static final double DEFAULT_OVERHEAD_CONSTANT = 1.03; + + private volatile MemoryCircuitBreaker breaker; + private volatile long maxBytes; + private volatile double overhead; + + @Inject + public InternalCircuitBreakerService(Settings settings, NodeSettingsService nodeSettingsService, IndicesFieldDataCache fieldDataCache) { + super(settings); + long fieldDataMax = fieldDataCache.computeSizeInBytes(); + this.maxBytes = settings.getAsBytesSize(CIRCUIT_BREAKER_MAX_BYTES_SETTING, new ByteSizeValue(fieldDataMax)).bytes(); + this.overhead = settings.getAsDouble(CIRCUIT_BREAKER_OVERHEAD_SETTING, DEFAULT_OVERHEAD_CONSTANT); + + this.breaker = new MemoryCircuitBreaker(new ByteSizeValue(maxBytes), overhead, null, logger); + + nodeSettingsService.addListener(new ApplySettings()); + } + + class ApplySettings implements NodeSettingsService.Listener { + @Override + public void onRefreshSettings(Settings settings) { + // clear breaker now that settings have changed + ByteSizeValue newMaxByteSizeValue = settings.getAsBytesSize(CIRCUIT_BREAKER_MAX_BYTES_SETTING, null); + boolean breakerResetNeeded = false; + + if (newMaxByteSizeValue != null) { + logger.info("updating [{}] from [{}] to [{}]", CIRCUIT_BREAKER_MAX_BYTES_SETTING, + new ByteSizeValue(InternalCircuitBreakerService.this.maxBytes), newMaxByteSizeValue); + InternalCircuitBreakerService.this.maxBytes = newMaxByteSizeValue.bytes(); + breakerResetNeeded = true; + } + + double newOverhead = settings.getAsDouble(CIRCUIT_BREAKER_OVERHEAD_SETTING, overhead); + if (newOverhead != overhead) { + logger.info("updating [{}] from [{}] to [{}]", CIRCUIT_BREAKER_OVERHEAD_SETTING, + overhead, newOverhead); + InternalCircuitBreakerService.this.overhead = newOverhead; + breakerResetNeeded = true; + } + + if (breakerResetNeeded) { + resetBreaker(); + } + } + } + + /** + * @return a {@link org.elasticsearch.common.breaker.MemoryCircuitBreaker} that can be used for aggregating memory usage + */ + public MemoryCircuitBreaker getBreaker() { + return this.breaker; + } + + /** + * Reset the breaker, creating a new one and initializing its used value + * to the actual field data usage, or the existing estimated usage if the + * actual value is not available. Will not trip the breaker even if the + * used value is higher than the limit for the breaker. + */ + public synchronized void resetBreaker() { + final MemoryCircuitBreaker oldBreaker = this.breaker; + // discard old breaker by creating a new one and pre-populating from the current breaker + this.breaker = new MemoryCircuitBreaker(new ByteSizeValue(maxBytes), overhead, oldBreaker, logger); + } + + @Override + public FieldDataBreakerStats stats() { + return new FieldDataBreakerStats(breaker.getMaximum(), breaker.getUsed(), breaker.getOverhead()); + } + + @Override + protected void doStart() throws ElasticSearchException { + } + + @Override + protected void doStop() throws ElasticSearchException { + } + + @Override + protected void doClose() throws ElasticSearchException { + } +} diff --git a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java index 0b2d4d95a62..d97a8184bff 100644 --- a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java +++ b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java @@ -23,16 +23,14 @@ import com.google.common.cache.*; import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.index.SegmentReader; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.MemoryCircuitBreaker; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; -import org.elasticsearch.index.fielddata.AtomicFieldData; -import org.elasticsearch.index.fielddata.FieldDataType; -import org.elasticsearch.index.fielddata.IndexFieldData; -import org.elasticsearch.index.fielddata.IndexFieldDataCache; +import org.elasticsearch.index.fielddata.*; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.ShardId; @@ -47,6 +45,8 @@ import java.util.concurrent.TimeUnit; */ public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener { + private static final long JVM_HEAP_MAX_BYTES = JvmInfo.jvmInfo().getMem().getHeapMax().bytes(); + Cache cache; private volatile String size; @@ -59,7 +59,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL super(settings); this.size = componentSettings.get("size", "-1"); this.expire = componentSettings.getAsTime("expire", null); - computeSizeInBytes(); + this.sizeInBytes = computeSizeInBytes(); buildCache(); } @@ -78,14 +78,17 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL cache = cacheBuilder.build(); } - private void computeSizeInBytes() { + /** + * @return the maximum configured size for the field data cache, in bytes, or -1 if not set + */ + public long computeSizeInBytes() { if (size.equals("-1")) { - sizeInBytes = -1; + return -1; } else if (size.endsWith("%")) { double percent = Double.parseDouble(size.substring(0, size.length() - 1)); - sizeInBytes = (long) ((percent / 100) * JvmInfo.jvmInfo().getMem().getHeapMax().bytes()); + return (long) ((percent / 100) * JVM_HEAP_MAX_BYTES); } else { - sizeInBytes = ByteSizeValue.parseBytesSizeValue(size).bytes(); + return ByteSizeValue.parseBytesSizeValue(size).bytes(); } } diff --git a/src/main/java/org/elasticsearch/node/service/NodeService.java b/src/main/java/org/elasticsearch/node/service/NodeService.java index b0e95a5cbb4..ebfd752c65b 100644 --- a/src/main/java/org/elasticsearch/node/service/NodeService.java +++ b/src/main/java/org/elasticsearch/node/service/NodeService.java @@ -36,6 +36,7 @@ import org.elasticsearch.http.HttpServer; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -55,6 +56,8 @@ public class NodeService extends AbstractComponent { private final PluginsService pluginService; + private final CircuitBreakerService circuitBreakerService; + @Nullable private HttpServer httpServer; @@ -70,7 +73,7 @@ public class NodeService extends AbstractComponent { @Inject public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery, TransportService transportService, IndicesService indicesService, - PluginsService pluginService, Version version) { + PluginsService pluginService, CircuitBreakerService circuitBreakerService, Version version) { super(settings); this.threadPool = threadPool; this.monitorService = monitorService; @@ -84,6 +87,7 @@ public class NodeService extends AbstractComponent { } this.version = version; this.pluginService = pluginService; + this.circuitBreakerService = circuitBreakerService; } public void setHttpServer(@Nullable HttpServer httpServer) { @@ -156,11 +160,13 @@ public class NodeService extends AbstractComponent { monitorService.networkService().stats(), monitorService.fsService().stats(), transportService.stats(), - httpServer == null ? null : httpServer.stats() + httpServer == null ? null : httpServer.stats(), + circuitBreakerService.stats() ); } - public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool, boolean network, boolean fs, boolean transport, boolean http) { + public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool, boolean network, + boolean fs, boolean transport, boolean http, boolean circuitBreaker) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) return new NodeStats(disovery.localNode(), System.currentTimeMillis(), hostname, @@ -172,7 +178,8 @@ public class NodeService extends AbstractComponent { network ? monitorService.networkService().stats() : null, fs ? monitorService.fsService().stats() : null, transport ? transportService.stats() : null, - http ? (httpServer == null ? null : httpServer.stats()) : null + http ? (httpServer == null ? null : httpServer.stats()) : null, + circuitBreaker ? circuitBreakerService.stats() : null ); } } diff --git a/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/stats/RestNodesStatsAction.java b/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/stats/RestNodesStatsAction.java index 9535bca4c53..b4448582f5d 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/stats/RestNodesStatsAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/stats/RestNodesStatsAction.java @@ -116,6 +116,12 @@ public class RestNodesStatsAction extends BaseRestHandler { controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/stats/http", httpHandler); controller.registerHandler(RestRequest.Method.GET, "/_nodes/http/stats", httpHandler); controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/http/stats", httpHandler); + + RestBreakerHandler breakerHandler = new RestBreakerHandler(); + controller.registerHandler(RestRequest.Method.GET, "/_nodes/stats/breaker", breakerHandler); + controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/stats/breaker", breakerHandler); + controller.registerHandler(RestRequest.Method.GET, "/_nodes/breaker/stats", breakerHandler); + controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/breaker/stats", breakerHandler); } @Override @@ -141,6 +147,7 @@ public class RestNodesStatsAction extends BaseRestHandler { nodesStatsRequest.fs(request.paramAsBoolean("fs", nodesStatsRequest.fs())); nodesStatsRequest.transport(request.paramAsBoolean("transport", nodesStatsRequest.transport())); nodesStatsRequest.http(request.paramAsBoolean("http", nodesStatsRequest.http())); + nodesStatsRequest.breaker(request.paramAsBoolean("breaker", nodesStatsRequest.breaker())); executeNodeStats(request, channel, nodesStatsRequest); } @@ -264,4 +271,13 @@ public class RestNodesStatsAction extends BaseRestHandler { executeNodeStats(request, channel, nodesStatsRequest); } } + + class RestBreakerHandler implements RestHandler { + @Override + public void handleRequest(final RestRequest request, final RestChannel channel) { + NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(Strings.splitStringByCommaToArray(request.param("nodeId"))); + nodesStatsRequest.clear().breaker(true); + executeNodeStats(request, channel, nodesStatsRequest); + } + } } diff --git a/src/main/java/org/elasticsearch/search/SearchModule.java b/src/main/java/org/elasticsearch/search/SearchModule.java index 37c8ccec333..5f0c61fcf35 100644 --- a/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/src/main/java/org/elasticsearch/search/SearchModule.java @@ -24,6 +24,8 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.SpawnModules; import org.elasticsearch.index.query.functionscore.FunctionScoreModule; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.aggregations.AggregationModule; import org.elasticsearch.search.controller.SearchPhaseController; diff --git a/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java b/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java index 49002aff0b5..88c00f2e03b 100644 --- a/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java +++ b/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.fielddata.IndexNumericFieldData; import org.elasticsearch.index.mapper.ContentPath; import org.elasticsearch.index.mapper.Mapper.BuilderContext; import org.elasticsearch.index.mapper.core.LongFieldMapper; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import java.util.Random; @@ -135,7 +136,7 @@ public class LongFieldDataBenchmark { indexWriter.close(); final DirectoryReader dr = DirectoryReader.open(dir); - final IndexFieldDataService fds = new IndexFieldDataService(new Index("dummy")); + final IndexFieldDataService fds = new IndexFieldDataService(new Index("dummy"), new DummyCircuitBreakerService()); final LongFieldMapper mapper = new LongFieldMapper.Builder(fieldName).build(new BuilderContext(null, new ContentPath(1))); final IndexNumericFieldData fd = fds.getForField(mapper); final long start = System.nanoTime(); diff --git a/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java b/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java new file mode 100644 index 00000000000..07647ee2123 --- /dev/null +++ b/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java @@ -0,0 +1,108 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.breaker; + +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests for the Memory Aggregating Circuit Breaker + */ +public class MemoryCircuitBreakerTests extends ElasticsearchTestCase { + + @Test + public void testThreadedUpdatesToBreaker() throws Exception { + final int NUM_THREADS = 5; + final int BYTES_PER_THREAD = 1000; + final Thread[] threads = new Thread[NUM_THREADS]; + final AtomicBoolean tripped = new AtomicBoolean(false); + final AtomicReference lastException = new AtomicReference(null); + + final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue((BYTES_PER_THREAD * NUM_THREADS) - 1), 1.0, logger); + + for (int i = 0; i < NUM_THREADS; i++) { + threads[i] = new Thread(new Runnable() { + @Override + public void run() { + for (int j = 0; j < BYTES_PER_THREAD; j++) { + try { + breaker.addEstimateBytesAndMaybeBreak(1L); + } catch (CircuitBreakingException e) { + if (tripped.get()) { + assertThat("tripped too many times", true, equalTo(false)); + } else { + assert tripped.compareAndSet(false, true); + } + } catch (Throwable e2) { + lastException.set(e2); + } + } + } + }); + + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + assertThat("no other exceptions were thrown", lastException.get(), equalTo(null)); + assertThat("breaker was tripped exactly once", tripped.get(), equalTo(true)); + } + + @Test + public void testConstantFactor() throws Exception { + final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue(15), 1.6, logger); + + // add only 7 bytes + breaker.addWithoutBreaking(7); + + try { + // this won't actually add it because it trips the breaker + breaker.addEstimateBytesAndMaybeBreak(3); + fail("should never reach this"); + } catch (CircuitBreakingException cbe) { + assert true; + } + + // shouldn't throw an exception + breaker.addEstimateBytesAndMaybeBreak(2); + + assertThat(breaker.getUsed(), equalTo(9L)); + + // adding 3 more bytes (now at 12) + breaker.addWithoutBreaking(3); + + try { + // Adding no bytes still breaks + breaker.addEstimateBytesAndMaybeBreak(0); + fail("should never reach this"); + } catch (CircuitBreakingException cbe) { + assert true; + } + } +} diff --git a/src/test/java/org/elasticsearch/index/aliases/IndexAliasesServiceTests.java b/src/test/java/org/elasticsearch/index/aliases/IndexAliasesServiceTests.java index 1a4303ad13f..135ffa20bf7 100644 --- a/src/test/java/org/elasticsearch/index/aliases/IndexAliasesServiceTests.java +++ b/src/test/java/org/elasticsearch/index/aliases/IndexAliasesServiceTests.java @@ -46,6 +46,8 @@ import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import org.junit.Test; @@ -83,6 +85,7 @@ public class IndexAliasesServiceTests extends ElasticsearchTestCase { @Override protected void configure() { bind(ClusterService.class).toProvider(Providers.of((ClusterService) null)); + bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class); } } ).createInjector(); diff --git a/src/test/java/org/elasticsearch/index/codec/CodecTests.java b/src/test/java/org/elasticsearch/index/codec/CodecTests.java index a3b374c942b..c0a01fcc796 100644 --- a/src/test/java/org/elasticsearch/index/codec/CodecTests.java +++ b/src/test/java/org/elasticsearch/index/codec/CodecTests.java @@ -34,6 +34,7 @@ import org.apache.lucene.codecs.memory.MemoryPostingsFormat; import org.apache.lucene.codecs.perfield.PerFieldPostingsFormat; import org.apache.lucene.codecs.pulsing.Pulsing41PostingsFormat; import org.apache.lucene.codecs.simpletext.SimpleTextCodec; +import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.settings.ImmutableSettings; @@ -52,6 +53,8 @@ import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.internal.VersionFieldMapper; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.test.ElasticsearchLuceneTestCase; import org.junit.Before; import org.junit.Test; @@ -414,6 +417,12 @@ public class CodecTests extends ElasticsearchLuceneTestCase { .add(new CodecModule(settings)) .add(new MapperServiceModule()) .add(new AnalysisModule(settings)) + .add(new AbstractModule() { + @Override + protected void configure() { + bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class); + } + }) .createInjector(); return injector.getInstance(CodecService.class); } diff --git a/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTests.java b/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTests.java index 6e71a9e524a..1a067c963cc 100644 --- a/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTests.java +++ b/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.mapper.ContentPath; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.Mapper.BuilderContext; import org.elasticsearch.index.mapper.MapperBuilders; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import org.junit.After; import org.junit.Before; @@ -76,7 +77,7 @@ public abstract class AbstractFieldDataTests extends ElasticsearchTestCase { @Before public void setup() throws Exception { - ifdService = new IndexFieldDataService(new Index("test")); + ifdService = new IndexFieldDataService(new Index("test"), new DummyCircuitBreakerService()); // LogByteSizeMP to preserve doc ID order writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(Lucene.VERSION, new StandardAnalyzer(Lucene.VERSION)).setMergePolicy(new LogByteSizeMergePolicy())); } diff --git a/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java b/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java index 1e12bfbd054..616d226a716 100644 --- a/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java +++ b/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.Mapper.BuilderContext; import org.elasticsearch.index.mapper.MapperBuilders; import org.elasticsearch.index.mapper.core.*; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import java.util.Arrays; @@ -49,7 +50,7 @@ public class IndexFieldDataServiceTests extends ElasticsearchTestCase { @SuppressWarnings("unchecked") public void testGetForFieldDefaults() { - final IndexFieldDataService ifdService = new IndexFieldDataService(new Index("test")); + final IndexFieldDataService ifdService = new IndexFieldDataService(new Index("test"), new DummyCircuitBreakerService()); for (boolean docValues : Arrays.asList(true, false)) { final BuilderContext ctx = new BuilderContext(null, new ContentPath(1)); final StringFieldMapper stringMapper = new StringFieldMapper.Builder("string").tokenized(false).fieldDataSettings(docValues ? DOC_VALUES_SETTINGS : ImmutableSettings.EMPTY).build(ctx); @@ -98,7 +99,7 @@ public class IndexFieldDataServiceTests extends ElasticsearchTestCase { @SuppressWarnings("unchecked") public void testByPassDocValues() { - final IndexFieldDataService ifdService = new IndexFieldDataService(new Index("test")); + final IndexFieldDataService ifdService = new IndexFieldDataService(new Index("test"), new DummyCircuitBreakerService()); final BuilderContext ctx = new BuilderContext(null, new ContentPath(1)); final StringFieldMapper stringMapper = MapperBuilders.stringField("string").tokenized(false).fieldDataSettings(DOC_VALUES_SETTINGS).fieldDataSettings(ImmutableSettings.builder().put("format", "fst").build()).build(ctx); ifdService.clear(); @@ -129,7 +130,7 @@ public class IndexFieldDataServiceTests extends ElasticsearchTestCase { } public void testChangeFieldDataFormat() throws Exception { - final IndexFieldDataService ifdService = new IndexFieldDataService(new Index("test")); + final IndexFieldDataService ifdService = new IndexFieldDataService(new Index("test"), new DummyCircuitBreakerService()); final BuilderContext ctx = new BuilderContext(null, new ContentPath(1)); final StringFieldMapper mapper1 = MapperBuilders.stringField("s").tokenized(false).fieldDataSettings(ImmutableSettings.builder().put(FieldDataType.FORMAT_KEY, "paged_bytes").build()).build(ctx); final IndexWriter writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(TEST_VERSION_CURRENT, new KeywordAnalyzer())); diff --git a/src/test/java/org/elasticsearch/index/mapper/MapperTestUtils.java b/src/test/java/org/elasticsearch/index/mapper/MapperTestUtils.java index 786c00f2a28..4cd591096c8 100644 --- a/src/test/java/org/elasticsearch/index/mapper/MapperTestUtils.java +++ b/src/test/java/org/elasticsearch/index/mapper/MapperTestUtils.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityLookupService; import org.elasticsearch.indices.analysis.IndicesAnalysisModule; import org.elasticsearch.indices.analysis.IndicesAnalysisService; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; /** * @@ -58,7 +59,7 @@ public class MapperTestUtils { } public static MapperService newMapperService(Index index, Settings indexSettings) { - return new MapperService(index, indexSettings, new Environment(), newAnalysisService(), new IndexFieldDataService(index), + return new MapperService(index, indexSettings, new Environment(), newAnalysisService(), new IndexFieldDataService(index, new DummyCircuitBreakerService()), new PostingsFormatService(index), new DocValuesFormatService(index), newSimilarityLookupService()); } diff --git a/src/test/java/org/elasticsearch/index/query/SimpleIndexQueryParserTests.java b/src/test/java/org/elasticsearch/index/query/SimpleIndexQueryParserTests.java index bd9ac9cad2e..8ed2cdfa18d 100644 --- a/src/test/java/org/elasticsearch/index/query/SimpleIndexQueryParserTests.java +++ b/src/test/java/org/elasticsearch/index/query/SimpleIndexQueryParserTests.java @@ -61,6 +61,8 @@ import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolModule; @@ -118,6 +120,7 @@ public class SimpleIndexQueryParserTests extends ElasticsearchTestCase { @Override protected void configure() { bind(ClusterService.class).toProvider(Providers.of((ClusterService) null)); + bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class); } } ).createInjector(); diff --git a/src/test/java/org/elasticsearch/index/query/guice/IndexQueryParserModuleTests.java b/src/test/java/org/elasticsearch/index/query/guice/IndexQueryParserModuleTests.java index 3d491554d5d..218c0161c48 100644 --- a/src/test/java/org/elasticsearch/index/query/guice/IndexQueryParserModuleTests.java +++ b/src/test/java/org/elasticsearch/index/query/guice/IndexQueryParserModuleTests.java @@ -40,6 +40,8 @@ import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolModule; @@ -84,6 +86,7 @@ public class IndexQueryParserModuleTests extends ElasticsearchTestCase { @Override protected void configure() { bind(ClusterService.class).toProvider(Providers.of((ClusterService) null)); + bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class); } } ).createInjector(); diff --git a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java index 8eedbd7061b..726c49408b5 100644 --- a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java +++ b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java @@ -41,6 +41,8 @@ import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolModule; @@ -82,6 +84,7 @@ public class IndexQueryParserPlugin2Tests extends ElasticsearchTestCase { @Override protected void configure() { bind(ClusterService.class).toProvider(Providers.of((ClusterService) null)); + bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class); } } ).createInjector(); diff --git a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java index ab1649a8fad..b6aa8a6290f 100644 --- a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java +++ b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java @@ -41,12 +41,13 @@ import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolModule; import org.junit.Test; -import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; /** @@ -91,6 +92,7 @@ public class IndexQueryParserPluginTests extends ElasticsearchTestCase { @Override protected void configure() { bind(ClusterService.class).toProvider(Providers.of((ClusterService) null)); + bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class); } } ).createInjector(); diff --git a/src/test/java/org/elasticsearch/index/search/FieldDataTermsFilterTests.java b/src/test/java/org/elasticsearch/index/search/FieldDataTermsFilterTests.java index 7bb7e31fff4..1890e091d03 100644 --- a/src/test/java/org/elasticsearch/index/search/FieldDataTermsFilterTests.java +++ b/src/test/java/org/elasticsearch/index/search/FieldDataTermsFilterTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.index.mapper.core.DoubleFieldMapper; import org.elasticsearch.index.mapper.core.LongFieldMapper; import org.elasticsearch.index.mapper.core.NumberFieldMapper; import org.elasticsearch.index.mapper.core.StringFieldMapper; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.test.ElasticsearchTestCase; import org.junit.After; @@ -78,7 +79,7 @@ public class FieldDataTermsFilterTests extends ElasticsearchTestCase { .build(new Mapper.BuilderContext(null, new ContentPath(1))); // create index and fielddata service - ifdService = new IndexFieldDataService(new Index("test")); + ifdService = new IndexFieldDataService(new Index("test"), new DummyCircuitBreakerService()); writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(Lucene.VERSION, new StandardAnalyzer(Lucene.VERSION))); diff --git a/src/test/java/org/elasticsearch/index/similarity/SimilarityTests.java b/src/test/java/org/elasticsearch/index/similarity/SimilarityTests.java index 92d74456b96..474d312de93 100644 --- a/src/test/java/org/elasticsearch/index/similarity/SimilarityTests.java +++ b/src/test/java/org/elasticsearch/index/similarity/SimilarityTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.similarity; import org.apache.lucene.search.similarities.*; +import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.settings.ImmutableSettings; @@ -33,6 +34,8 @@ import org.elasticsearch.index.codec.CodecModule; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperServiceModule; import org.elasticsearch.index.settings.IndexSettingsModule; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import org.junit.Test; @@ -40,7 +43,6 @@ import java.io.IOException; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.MatcherAssert.assertThat; public class SimilarityTests extends ElasticsearchTestCase { @@ -163,6 +165,12 @@ public class SimilarityTests extends ElasticsearchTestCase { .add(new MapperServiceModule()) .add(new AnalysisModule(settings)) .add(new SimilarityModule(settings)) + .add(new AbstractModule() { + @Override + protected void configure() { + bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class); + } + }) .createInjector(); return injector.getInstance(SimilarityService.class); } diff --git a/src/test/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerServiceTests.java b/src/test/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerServiceTests.java new file mode 100644 index 00000000000..72c86563c0a --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerServiceTests.java @@ -0,0 +1,148 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.fielddata.breaker; + +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.junit.Test; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +/** + * Integration tests for InternalCircuitBreakerService + */ +public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest { + + @Test + @TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") + public void testMemoryBreaker() { + assertAcked(prepareCreate("cb-test", 1)); + final Client client = client(); + + try { + + // index some different terms so we have some field data for loading + int docCount = atLeast(300); + for (long id = 0; id < docCount; id++) { + client.prepareIndex("cb-test", "type", Long.toString(id)) + .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); + } + + // refresh + refresh(); + + // execute a search that loads field data (sorting on the "test" field) + client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}") + .execute().actionGet(); + + // clear field data cache (thus setting the loaded field data back to 0) + client.admin().indices().prepareClearCache("cb-test").setFieldDataCache(true).execute().actionGet(); + + // Update circuit breaker settings + Settings settings = settingsBuilder() + .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "100b") + .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet(); + + // execute a search that loads field data (sorting on the "test" field) + // again, this time it should trip the breaker + try { + client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}") + .execute().actionGet(); + fail("should not reach this point"); + } catch (SearchPhaseExecutionException e) { + assert true; + } + + } finally { + // Reset settings + Settings resetSettings = settingsBuilder() + .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "-1") + .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, InternalCircuitBreakerService.DEFAULT_OVERHEAD_CONSTANT) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + } + } + + @Test + @TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") + public void testRamAccountingTermsEnum() { + final Client client = client(); + + try { + + // Create an index where the mappings have a field data filter + client.admin().indices().prepareCreate("ramtest").setSource("{\"mappings\": {\"type\": {\"properties\": {\"test\": " + + "{\"type\": \"string\",\"fielddata\": {\"filter\": {\"regex\": {\"pattern\": \"^value.*\"}}}}}}}}").execute().actionGet(); + + // Wait 10 seconds for green + client.admin().cluster().prepareHealth("ramtest").setWaitForGreenStatus().setTimeout("10s").execute().actionGet(); + + // index some different terms so we have some field data for loading + int docCount = atLeast(300); + for (long id = 0; id < docCount; id++) { + client.prepareIndex("ramtest", "type", Long.toString(id)) + .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); + } + + // refresh + refresh(); + + // execute a search that loads field data (sorting on the "test" field) + client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}") + .execute().actionGet(); + + // clear field data cache (thus setting the loaded field data back to 0) + client.admin().indices().prepareClearCache("ramtest").setFieldDataCache(true).execute().actionGet(); + + // Update circuit breaker settings + Settings settings = settingsBuilder() + .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "100b") + .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet(); + + // execute a search that loads field data (sorting on the "test" field) + // again, this time it should trip the breaker + try { + client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}") + .execute().actionGet(); + fail("should not reach this point"); + } catch (SearchPhaseExecutionException e) { + assert true; + } + + } finally { + // Reset settings + Settings resetSettings = settingsBuilder() + .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "-1") + .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, InternalCircuitBreakerService.DEFAULT_OVERHEAD_CONSTANT) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + } + } +} diff --git a/src/test/java/org/elasticsearch/indices/fielddata/breaker/DummyCircuitBreakerService.java b/src/test/java/org/elasticsearch/indices/fielddata/breaker/DummyCircuitBreakerService.java new file mode 100644 index 00000000000..62345d1d0af --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/fielddata/breaker/DummyCircuitBreakerService.java @@ -0,0 +1,47 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.fielddata.breaker; + +import org.elasticsearch.common.breaker.MemoryCircuitBreaker; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.unit.ByteSizeValue; + +/** + * Class that returns a breaker that never breaks + */ +public class DummyCircuitBreakerService implements CircuitBreakerService { + + private final ESLogger logger = Loggers.getLogger(DummyCircuitBreakerService.class); + + private final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue(Long.MAX_VALUE), 0.0, logger); + + public DummyCircuitBreakerService() {} + + @Override + public MemoryCircuitBreaker getBreaker() { + return breaker; + } + + @Override + public FieldDataBreakerStats stats() { + return new FieldDataBreakerStats(-1, -1, 0); + } +} diff --git a/src/test/java/org/elasticsearch/indices/fielddata/breaker/RandomExceptionCircuitBreakerTests.java b/src/test/java/org/elasticsearch/indices/fielddata/breaker/RandomExceptionCircuitBreakerTests.java new file mode 100644 index 00000000000..219a96a234e --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/fielddata/breaker/RandomExceptionCircuitBreakerTests.java @@ -0,0 +1,235 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.fielddata.breaker; + +import org.apache.lucene.index.AtomicReader; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.util.English; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.engine.MockRobinEngine; +import org.elasticsearch.test.engine.ThrowingAtomicReaderWrapper; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.store.MockDirectoryHelper; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests for the circuit breaker while random exceptions are happening + */ +public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegrationTest { + + @Test + @TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") + public void testBreakerWithRandomExceptions() throws IOException, InterruptedException, ExecutionException { + final int numShards = between(1, 5); + String mapping = XContentFactory.jsonBuilder() + .startObject() + .startObject("type") + .startObject("properties") + .startObject("test-str") + .field("type", "string") + .field("index", "not_analyzed") + .startObject("fielddata") + .field("format", randomBytesFieldDataFormat()) + .endObject() // fielddata + .endObject() // test-str + .startObject("test-num") + // I don't use randomNumericType() here because I don't want "byte", and I want "float" and "double" + .field("type", randomFrom(Arrays.asList("float", "long", "double", "short", "integer"))) + .startObject("fielddata") + .field("format", randomNumericFieldDataFormat()) + .endObject() // fielddata + .endObject() // test-num + .endObject() // properties + .endObject() // type + .endObject() // {} + .string(); + final double exceptionRate; + final double exceptionOnOpenRate; + if (frequently()) { + if (randomBoolean()) { + if (randomBoolean()) { + exceptionOnOpenRate = 1.0/between(5, 100); + exceptionRate = 0.0d; + } else { + exceptionRate = 1.0/between(5, 100); + exceptionOnOpenRate = 0.0d; + } + } else { + exceptionOnOpenRate = 1.0/between(5, 100); + exceptionRate = 1.0/between(5, 100); + } + } else { + // rarely no exception + exceptionRate = 0d; + exceptionOnOpenRate = 0d; + } + + ImmutableSettings.Builder settings = settingsBuilder() + .put("index.number_of_shards", numShards) + .put("index.number_of_replicas", randomIntBetween(0, 1)) + .put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE, exceptionRate) + .put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE_ON_OPEN, exceptionOnOpenRate) + .put(MockDirectoryHelper.CHECK_INDEX_ON_CLOSE, true); + logger.info("creating index: [test] using settings: [{}]", settings.build().getAsMap()); + client().admin().indices().prepareCreate("test") + .setSettings(settings) + .addMapping("type", mapping).execute().actionGet(); + ClusterHealthResponse clusterHealthResponse = client().admin().cluster() + .health(Requests.clusterHealthRequest().waitForYellowStatus().timeout(TimeValue.timeValueSeconds(5))).get(); // it's OK to timeout here + final int numDocs; + if (clusterHealthResponse.isTimedOut()) { + /* some seeds just won't let you create the index at all and we enter a ping-pong mode + * trying one node after another etc. that is ok but we need to make sure we don't wait + * forever when indexing documents so we set numDocs = 1 and expect all shards to fail + * when we search below.*/ + logger.info("ClusterHealth timed out - only index one doc and expect searches to fail"); + numDocs = 1; + } else { + numDocs = between(10, 100); + } + for (int i = 0; i < numDocs ; i++) { + try { + client().prepareIndex("test", "type", "" + i) + .setTimeout(TimeValue.timeValueSeconds(1)).setSource("test-str", randomUnicodeOfLengthBetween(5, 25), "test-num", i).get(); + } catch (ElasticSearchException ex) { + } + } + logger.info("Start Refresh"); + RefreshResponse refreshResponse = client().admin().indices().prepareRefresh("test").execute().get(); // don't assert on failures here + final boolean refreshFailed = refreshResponse.getShardFailures().length != 0 || refreshResponse.getFailedShards() != 0; + logger.info("Refresh failed: [{}] numShardsFailed: [{}], shardFailuresLength: [{}], successfulShards: [{}], totalShards: [{}] ", + refreshFailed, refreshResponse.getFailedShards(), refreshResponse.getShardFailures().length, + refreshResponse.getSuccessfulShards(), refreshResponse.getTotalShards()); + final int numSearches = atLeast(10); + + for (int i = 0; i < numSearches; i++) { + try { + // Sort by the string and numeric fields, to load them into field data + client().prepareSearch().setQuery(QueryBuilders.matchAllQuery()) + .addSort("test-str", SortOrder.ASC) + .addSort("test-num", SortOrder.ASC).get(); + } catch (SearchPhaseExecutionException ex) { + logger.info("expected SearchPhaseException: [{}]", ex.getMessage()); + } + + if (frequently()) { + // Now, clear the cache and check that the circuit breaker has been + // successfully set back to zero. If there is a bug in the circuit + // breaker adjustment code, it should show up here by the breaker + // estimate being either positive or negative. + client().admin().indices().prepareClearCache("test").setFieldDataCache(true).execute().actionGet(); + NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().all().execute().actionGet(); + for (NodeStats stats : resp.getNodes()) { + assertThat("Breaker reset to 0", stats.getBreaker().getEstimated(), equalTo(0L)); + } + } + } + } + + public static final String EXCEPTION_TOP_LEVEL_RATIO_KEY = "index.engine.exception.ratio.top"; + public static final String EXCEPTION_LOW_LEVEL_RATIO_KEY = "index.engine.exception.ratio.low"; + + // TODO: Generalize this class and add it as a utility + public static class RandomExceptionDirectoryReaderWrapper extends MockRobinEngine.DirectoryReaderWrapper { + private final Settings settings; + static class ThrowingSubReaderWrapper extends SubReaderWrapper implements ThrowingAtomicReaderWrapper.Thrower { + private final Random random; + private final double topLevelRatio; + private final double lowLevelRatio; + + ThrowingSubReaderWrapper(Settings settings) { + final long seed = settings.getAsLong(ElasticsearchIntegrationTest.INDEX_SEED_SETTING, 0l); + this.topLevelRatio = settings.getAsDouble(EXCEPTION_TOP_LEVEL_RATIO_KEY, 0.1d); + this.lowLevelRatio = settings.getAsDouble(EXCEPTION_LOW_LEVEL_RATIO_KEY, 0.1d); + this.random = new Random(seed); + } + + @Override + public AtomicReader wrap(AtomicReader reader) { + return new ThrowingAtomicReaderWrapper(reader, this); + } + + @Override + public void maybeThrow(ThrowingAtomicReaderWrapper.Flags flag) throws IOException { + switch (flag) { + case Fields: + break; + case TermVectors: + break; + case Terms: + case TermsEnum: + if (random.nextDouble() < topLevelRatio) { + throw new IOException("Forced top level Exception on [" + flag.name() + "]"); + } + case Intersect: + break; + case Norms: + break; + case NumericDocValues: + break; + case BinaryDocValues: + break; + case SortedDocValues: + break; + case SortedSetDocValues: + break; + case DocsEnum: + case DocsAndPositionsEnum: + if (random.nextDouble() < lowLevelRatio) { + throw new IOException("Forced low level Exception on [" + flag.name() + "]"); + } + break; + } + + } + } + + public RandomExceptionDirectoryReaderWrapper(DirectoryReader in, Settings settings) { + super(in, new ThrowingSubReaderWrapper(settings)); + this.settings = settings; + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) { + return new RandomExceptionDirectoryReaderWrapper(in, settings); + } + } +} diff --git a/src/test/java/org/elasticsearch/search/geo/GeoDistanceTests.java b/src/test/java/org/elasticsearch/search/geo/GeoDistanceTests.java index 0f0bd902e63..e0dd2829a17 100644 --- a/src/test/java/org/elasticsearch/search/geo/GeoDistanceTests.java +++ b/src/test/java/org/elasticsearch/search/geo/GeoDistanceTests.java @@ -52,15 +52,11 @@ import static org.hamcrest.Matchers.*; */ public class GeoDistanceTests extends ElasticsearchIntegrationTest { - private static String randomFieldDataFormat() { - return randomFrom(Arrays.asList("array", "compressed", "doc_values")); - } - @Test public void simpleDistanceTests() throws Exception { String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1") .startObject("properties").startObject("location").field("type", "geo_point").field("lat_lon", true) - .startObject("fielddata").field("format", randomFieldDataFormat()).endObject().endObject().endObject() + .startObject("fielddata").field("format", randomNumericFieldDataFormat()).endObject().endObject().endObject() .endObject().endObject().string(); client().admin().indices().prepareCreate("test").addMapping("type1", mapping).execute().actionGet(); client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); @@ -212,7 +208,7 @@ public class GeoDistanceTests extends ElasticsearchIntegrationTest { public void testDistanceSortingMVFields() throws Exception { String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1") .startObject("properties").startObject("locations").field("type", "geo_point").field("lat_lon", true) - .startObject("fielddata").field("format", randomFieldDataFormat()).endObject().endObject().endObject() + .startObject("fielddata").field("format", randomNumericFieldDataFormat()).endObject().endObject().endObject() .endObject().endObject().string(); client().admin().indices().prepareCreate("test") @@ -344,7 +340,7 @@ public class GeoDistanceTests extends ElasticsearchIntegrationTest { public void testDistanceSortingWithMissingGeoPoint() throws Exception { String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1") .startObject("properties").startObject("locations").field("type", "geo_point").field("lat_lon", true) - .startObject("fielddata").field("format", randomFieldDataFormat()).endObject().endObject().endObject() + .startObject("fielddata").field("format", randomNumericFieldDataFormat()).endObject().endObject().endObject() .endObject().endObject().string(); client().admin().indices().prepareCreate("test") @@ -446,7 +442,7 @@ public class GeoDistanceTests extends ElasticsearchIntegrationTest { .startObject("properties") .startObject("name").field("type", "string").endObject() .startObject("location").field("type", "geo_point").field("lat_lon", true) - .startObject("fielddata").field("format", randomFieldDataFormat()).endObject().endObject() + .startObject("fielddata").field("format", randomNumericFieldDataFormat()).endObject().endObject() .endObject() .endObject() .endObject() @@ -621,7 +617,7 @@ public class GeoDistanceTests extends ElasticsearchIntegrationTest { .field("geohash_precision", 24) .field("lat_lon", true) .startObject("fielddata") - .field("format", randomFieldDataFormat()) + .field("format", randomNumericFieldDataFormat()) .endObject() .endObject() .endObject() diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 1473fa57cc6..504e7a5bead 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -880,4 +880,20 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase return perTestRatio; } + /** + * Returns a random numeric field data format from the choices of "array", + * "compressed", or "doc_values". + */ + public static String randomNumericFieldDataFormat() { + return randomFrom(Arrays.asList("array", "compressed", "doc_values")); + } + + /** + * Returns a random bytes field data format from the choices of + * "paged_bytes", "fst", or "doc_values". + */ + public static String randomBytesFieldDataFormat() { + return randomFrom(Arrays.asList("paged_bytes", "fst", "doc_values")); + } + }