diff --git a/docs/reference/cluster/nodes-stats.asciidoc b/docs/reference/cluster/nodes-stats.asciidoc index 75f8eea4515..b819e8c03e8 100644 --- a/docs/reference/cluster/nodes-stats.asciidoc +++ b/docs/reference/cluster/nodes-stats.asciidoc @@ -23,7 +23,7 @@ second command selectively retrieves nodes stats of only `nodeId1` and <>. By default, `indices` stats are returned. With options for `indices`, -`os`, `process`, `jvm`, `network`, `transport`, `http`, `fs`, and +`os`, `process`, `jvm`, `network`, `transport`, `http`, `fs`, `breaker`, and `thread_pool`. For example: [horizontal] @@ -60,6 +60,9 @@ By default, `indices` stats are returned. With options for `indices`, Transport statistics about sent and received bytes in cluster communication +`breaker`:: + Statistics about the field data circuit breaker + `clear`:: Clears all the flags (first). Useful, if you only want to retrieve specific stats. diff --git a/docs/reference/index-modules/fielddata.asciidoc b/docs/reference/index-modules/fielddata.asciidoc index 2ac2b4f6340..156c13fbe20 100644 --- a/docs/reference/index-modules/fielddata.asciidoc +++ b/docs/reference/index-modules/fielddata.asciidoc @@ -237,9 +237,34 @@ The `frequency` and `regex` filters can be combined: } -------------------------------------------------- +[float] +[[field-data-circuit-breaker]] +=== Field data circuit breaker +The field data circuit breaker allows Elasticsearch to estimate the amount of +memory a field will required to be loaded into memory. It can then prevent the +field data loading by raising and exception. By default it is configured with +no limit (-1 bytes), but is automatically set to `indices.fielddata.cache.size` +if set. It can be configured with the following parameters: + +[cols="<,<",options="header",] +|======================================================================= +|Setting |Description +|`indices.fielddata.breaker.limit` |Maximum size of estimated field data +to allow loading. Defaults to `indices.fielddata.cache.size` if set, unbounded +if not. +|`indices.fielddata.breaker.overhead` |A constant that all field data +estimations are multiplied with to determine a final estimation. Defaults to +1.03 +|======================================================================= + +Both the `indices.fielddata.breaker.limit` and +`indices.fielddata.breaker.overhead` can be changed dynamically using the +cluster update settings API. + [float] [[field-data-monitoring]] === Monitoring field data -You can monitor memory usage for field data using +You can monitor memory usage for field data as well as the field data circuit +breaker using <> diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java index 149fdc5fda8..5d8004472c0 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.http.HttpStats; import org.elasticsearch.indices.NodeIndicesStats; +import org.elasticsearch.indices.fielddata.breaker.FieldDataBreakerStats; import org.elasticsearch.monitor.fs.FsStats; import org.elasticsearch.monitor.jvm.JvmStats; import org.elasticsearch.monitor.network.NetworkStats; @@ -76,12 +77,16 @@ public class NodeStats extends NodeOperationResponse implements ToXContent { @Nullable private HttpStats http; + @Nullable + private FieldDataBreakerStats breaker; + NodeStats() { } public NodeStats(DiscoveryNode node, long timestamp, @Nullable String hostname, @Nullable NodeIndicesStats indices, - @Nullable OsStats os, @Nullable ProcessStats process, @Nullable JvmStats jvm, @Nullable ThreadPoolStats threadPool, @Nullable NetworkStats network, - @Nullable FsStats fs, @Nullable TransportStats transport, @Nullable HttpStats http) { + @Nullable OsStats os, @Nullable ProcessStats process, @Nullable JvmStats jvm, @Nullable ThreadPoolStats threadPool, + @Nullable NetworkStats network, @Nullable FsStats fs, @Nullable TransportStats transport, @Nullable HttpStats http, + @Nullable FieldDataBreakerStats breaker) { super(node); this.timestamp = timestamp; this.hostname = hostname; @@ -94,6 +99,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent { this.fs = fs; this.transport = transport; this.http = http; + this.breaker = breaker; } public long getTimestamp() { @@ -171,6 +177,11 @@ public class NodeStats extends NodeOperationResponse implements ToXContent { return this.http; } + @Nullable + public FieldDataBreakerStats getBreaker() { + return this.breaker; + } + public static NodeStats readNodeStats(StreamInput in) throws IOException { NodeStats nodeInfo = new NodeStats(); nodeInfo.readFrom(in); @@ -211,6 +222,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent { if (in.readBoolean()) { http = HttpStats.readHttpStats(in); } + breaker = FieldDataBreakerStats.readOptionalCircuitBreakerStats(in); } @Override @@ -277,6 +289,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent { out.writeBoolean(true); http.writeTo(out); } + out.writeOptionalStreamable(breaker); } @Override @@ -326,6 +339,9 @@ public class NodeStats extends NodeOperationResponse implements ToXContent { if (getHttp() != null) { getHttp().toXContent(builder, params); } + if (getBreaker() != null) { + getBreaker().toXContent(builder, params); + } return builder; } diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java index 8fc499e050f..f45a97196c6 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -40,6 +40,7 @@ public class NodesStatsRequest extends NodesOperationRequest private boolean fs; private boolean transport; private boolean http; + private boolean breaker; protected NodesStatsRequest() { } @@ -65,6 +66,7 @@ public class NodesStatsRequest extends NodesOperationRequest this.fs = true; this.transport = true; this.http = true; + this.breaker = true; return this; } @@ -81,6 +83,7 @@ public class NodesStatsRequest extends NodesOperationRequest this.fs = false; this.transport = false; this.http = false; + this.breaker = false; return this; } @@ -225,6 +228,18 @@ public class NodesStatsRequest extends NodesOperationRequest return this; } + public boolean breaker() { + return this.breaker; + } + + /** + * Should the node's circuit breaker stats be returned. + */ + public NodesStatsRequest breaker(boolean breaker) { + this.breaker = breaker; + return this; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -237,6 +252,7 @@ public class NodesStatsRequest extends NodesOperationRequest fs = in.readBoolean(); transport = in.readBoolean(); http = in.readBoolean(); + breaker = in.readBoolean(); } @Override @@ -251,6 +267,7 @@ public class NodesStatsRequest extends NodesOperationRequest out.writeBoolean(fs); out.writeBoolean(transport); out.writeBoolean(http); + out.writeBoolean(breaker); } } diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 488b67966a0..4c93df069d2 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -97,7 +97,8 @@ public class TransportNodesStatsAction extends TransportNodesOperationAction shardsStats = new ArrayList(); for (String index : indicesService.indices()) { IndexService indexService = indicesService.indexService(index); diff --git a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java index e23ef23c1d1..dfccf3c1ee5 100644 --- a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java @@ -29,6 +29,7 @@ import org.elasticsearch.indices.cache.filter.IndicesFilterCache; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.ttl.IndicesTTLService; +import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; /** @@ -75,6 +76,8 @@ public class ClusterDynamicSettingsModule extends AbstractModule { clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED); clusterDynamicSettings.addDynamicSetting(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, Validator.TIME); clusterDynamicSettings.addDynamicSetting(SnapshotInProgressAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED); + clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, Validator.BYTES_SIZE); + clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE); } public void addDynamicSettings(String... settings) { diff --git a/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java b/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java new file mode 100644 index 00000000000..08afbc275f5 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java @@ -0,0 +1,33 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.breaker; + +import org.elasticsearch.ElasticSearchException; + +/** + * Exception thrown when the circuit breaker trips + */ +public class CircuitBreakingException extends ElasticSearchException { + + // TODO: maybe add more neat metrics here? + public CircuitBreakingException(String message) { + super(message); + } +} diff --git a/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java b/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java new file mode 100644 index 00000000000..33cb61872ce --- /dev/null +++ b/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java @@ -0,0 +1,180 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.breaker; + +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.unit.ByteSizeValue; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * MemoryCircuitBreaker is a circuit breaker that breaks once a + * configurable memory limit has been reached. + */ +public class MemoryCircuitBreaker { + + private final long memoryBytesLimit; + private final double overheadConstant; + private final AtomicLong used; + private final ESLogger logger; + + + /** + * Create a circuit breaker that will break if the number of estimated + * bytes grows above the limit. All estimations will be multiplied by + * the given overheadConstant. This breaker starts with 0 bytes used. + * @param limit circuit breaker limit + * @param overheadConstant constant multiplier for byte estimations + */ + public MemoryCircuitBreaker(ByteSizeValue limit, double overheadConstant, ESLogger logger) { + this.memoryBytesLimit = limit.bytes(); + this.overheadConstant = overheadConstant; + this.used = new AtomicLong(0); + this.logger = logger; + if (logger.isTraceEnabled()) { + logger.trace("Creating MemoryCircuitBreaker with a limit of {} bytes ({}) and a overhead constant of {}", + this.memoryBytesLimit, limit, this.overheadConstant); + } + } + + /** + * Create a circuit breaker that will break if the number of estimated + * bytes grows above the limit. All estimations will be multiplied by + * the given overheadConstant. Uses the given oldBreaker to initialize + * the starting offset. + * @param limit circuit breaker limit + * @param overheadConstant constant multiplier for byte estimations + * @param oldBreaker the previous circuit breaker to inherit the used value from (starting offset) + */ + public MemoryCircuitBreaker(ByteSizeValue limit, double overheadConstant, MemoryCircuitBreaker oldBreaker, ESLogger logger) { + this.memoryBytesLimit = limit.bytes(); + this.overheadConstant = overheadConstant; + if (oldBreaker == null) { + this.used = new AtomicLong(0); + } else { + this.used = oldBreaker.used; + } + this.logger = logger; + if (logger.isTraceEnabled()) { + logger.trace("Creating MemoryCircuitBreaker with a limit of {} bytes ({}) and a overhead constant of {}", + this.memoryBytesLimit, limit, this.overheadConstant); + } + } + + /** + * Method used to trip the breaker + * @throws CircuitBreakingException + */ + public void circuitBreak() throws CircuitBreakingException { + throw new CircuitBreakingException("Data too large, data would be larger than limit of [" + + memoryBytesLimit + "] bytes"); + } + + /** + * Add a number of bytes, tripping the circuit breaker if the aggregated + * estimates are above the limit. Automatically trips the breaker if the + * memory limit is set to 0. Will never trip the breaker if the limit is + * set < 0, but can still be used to aggregate estimations. + * @param bytes number of bytes to add to the breaker + * @return number of "used" bytes so far + * @throws CircuitBreakingException + */ + public double addEstimateBytesAndMaybeBreak(long bytes) throws CircuitBreakingException { + // short-circuit on no data allowed, immediately throwing an exception + if (memoryBytesLimit == 0) { + circuitBreak(); + } + + long newUsed; + // If there is no limit (-1), we can optimize a bit by using + // .addAndGet() instead of looping (because we don't have to check a + // limit), which makes the RamAccountingTermsEnum case faster. + if (this.memoryBytesLimit == -1) { + newUsed = this.used.addAndGet(bytes); + if (logger.isTraceEnabled()) { + logger.trace("Adding [{}] to used bytes [new used: [{}], limit: [-1b]]", + new ByteSizeValue(bytes), new ByteSizeValue(newUsed)); + } + return newUsed; + } + + // Otherwise, check the addition and commit the addition, looping if + // there are conflicts. May result in additional logging, but it's + // trace logging and shouldn't be counted on for additions. + long currentUsed; + do { + currentUsed = this.used.get(); + newUsed = currentUsed + bytes; + long newUsedWithOverhead = (long)(newUsed * overheadConstant); + if (logger.isTraceEnabled()) { + logger.trace("Adding [{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]", + new ByteSizeValue(bytes), new ByteSizeValue(newUsed), + memoryBytesLimit, new ByteSizeValue(memoryBytesLimit), + newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead)); + } + if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) { + logger.error("New used memory {} [{}] would be larger than configured breaker: {} [{}], breaking", + newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), + memoryBytesLimit, new ByteSizeValue(memoryBytesLimit)); + circuitBreak(); + } + // Attempt to set the new used value, but make sure it hasn't changed + // underneath us, if it has, keep trying until we are able to set it + } while (!this.used.compareAndSet(currentUsed, newUsed)); + + return newUsed; + } + + /** + * Add an exact number of bytes, not checking for tripping the + * circuit breaker. This bypasses the overheadConstant multiplication. + * @param bytes number of bytes to add to the breaker + * @return number of "used" bytes so far + */ + public long addWithoutBreaking(long bytes) { + long u = used.addAndGet(bytes); + if (logger.isTraceEnabled()) { + logger.trace("Adjusted breaker by [{}] bytes, now [{}]", bytes, u); + } + assert u >= 0 : "Used bytes: [" + u + "] must be >= 0"; + return u; + } + + /** + * @return the number of aggregated "used" bytes so far + */ + public long getUsed() { + return this.used.get(); + } + + /** + * @return the maximum number of bytes before the circuit breaker will trip + */ + public long getMaximum() { + return this.memoryBytesLimit; + } + + /** + * @return the constant multiplier the breaker uses for aggregations + */ + public double getOverhead() { + return this.overheadConstant; + } +} diff --git a/src/main/java/org/elasticsearch/index/fielddata/AbstractIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/AbstractIndexFieldData.java index ce73dd0e0fc..d36b1b9d085 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/AbstractIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/AbstractIndexFieldData.java @@ -2,6 +2,9 @@ package org.elasticsearch.index.fielddata; import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.AbstractIndexComponent; @@ -9,6 +12,8 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.settings.IndexSettings; +import java.io.IOException; + /** */ public abstract class AbstractIndexFieldData extends AbstractIndexComponent implements IndexFieldData { @@ -53,4 +58,41 @@ public abstract class AbstractIndexFieldData extends } } + /** + * A {@code PerValueEstimator} is a sub-class that can be used to estimate + * the memory overhead for loading the data. Each field data + * implementation should implement its own {@code PerValueEstimator} if it + * intends to take advantage of the MemoryCircuitBreaker. + *

+ * Note that the .beforeLoad(...) and .afterLoad(...) methods must be + * manually called. + */ + public interface PerValueEstimator { + + /** + * @return the number of bytes for the given term + */ + public long bytesPerValue(BytesRef term); + + /** + * Execute any pre-loading estimations for the terms. May also + * optionally wrap a {@link TermsEnum} in a + * {@link RamAccountingTermsEnum} + * which will estimate the memory on a per-term basis. + * + * @param terms terms to be estimated + * @return A TermsEnum for the given terms + * @throws IOException + */ + public TermsEnum beforeLoad(Terms terms) throws IOException; + + /** + * Possibly adjust a circuit breaker after field data has been loaded, + * now that the actual amount of memory used by the field data is known + * + * @param termsEnum terms that were loaded + * @param actualUsed actual field data memory usage + */ + public void afterLoad(TermsEnum termsEnum, long actualUsed); + } } diff --git a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java index bbccb2a9887..a3050f6f4e9 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldData.java @@ -32,6 +32,7 @@ import org.elasticsearch.index.IndexComponent; import org.elasticsearch.index.fielddata.fieldcomparator.SortMode; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; /** */ @@ -170,7 +171,8 @@ public interface IndexFieldData extends IndexCompone interface Builder { - IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache); + IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache, + CircuitBreakerService breakerService); } public interface WithOrdinals extends IndexFieldData { diff --git a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java index 091dc8d4efb..b0e36e1c822 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java +++ b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java @@ -36,6 +36,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -54,6 +55,7 @@ public class IndexFieldDataService extends AbstractIndexComponent { private final static ImmutableMap buildersByType; private final static ImmutableMap docValuesBuildersByType; private final static ImmutableMap, IndexFieldData.Builder> buildersByTypeAndFormat; + private final CircuitBreakerService circuitBreakerService; static { buildersByType = MapBuilder.newMapBuilder() @@ -123,14 +125,16 @@ public class IndexFieldDataService extends AbstractIndexComponent { IndexService indexService; // public for testing - public IndexFieldDataService(Index index) { - this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCache(ImmutableSettings.Builder.EMPTY_SETTINGS)); + public IndexFieldDataService(Index index, CircuitBreakerService circuitBreakerService) { + this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCache(ImmutableSettings.Builder.EMPTY_SETTINGS), circuitBreakerService); } @Inject - public IndexFieldDataService(Index index, @IndexSettings Settings indexSettings, IndicesFieldDataCache indicesFieldDataCache) { + public IndexFieldDataService(Index index, @IndexSettings Settings indexSettings, IndicesFieldDataCache indicesFieldDataCache, + CircuitBreakerService circuitBreakerService) { super(index, indexSettings); this.indicesFieldDataCache = indicesFieldDataCache; + this.circuitBreakerService = circuitBreakerService; } // we need to "inject" the index service to not create cyclic dep @@ -231,7 +235,7 @@ public class IndexFieldDataService extends AbstractIndexComponent { fieldDataCaches.put(fieldNames.indexName(), cache); } - fieldData = builder.build(index, indexSettings, mapper, cache); + fieldData = builder.build(index, indexSettings, mapper, cache, circuitBreakerService); loadedFieldData.put(fieldNames.indexName(), fieldData); } } diff --git a/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java b/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java new file mode 100644 index 00000000000..d734d55e4db --- /dev/null +++ b/src/main/java/org/elasticsearch/index/fielddata/RamAccountingTermsEnum.java @@ -0,0 +1,99 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.fielddata; + +import org.apache.lucene.index.FilteredTermsEnum; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.breaker.MemoryCircuitBreaker; +import org.elasticsearch.index.fielddata.AbstractIndexFieldData; + +import java.io.IOException; + +/** + * {@link TermsEnum} that takes a MemoryCircuitBreaker, increasing the breaker + * every time {@code .next(...)} is called. Proxies all methods to the original + * TermsEnum otherwise. + */ +public final class RamAccountingTermsEnum extends FilteredTermsEnum { + + // Flush every 1mb + private static final long FLUSH_BUFFER_SIZE = 1024 * 1024; + + private final MemoryCircuitBreaker breaker; + private final TermsEnum termsEnum; + private final AbstractIndexFieldData.PerValueEstimator estimator; + private long totalBytes; + private long flushBuffer; + + + public RamAccountingTermsEnum(TermsEnum termsEnum, MemoryCircuitBreaker breaker, AbstractIndexFieldData.PerValueEstimator estimator) { + super(termsEnum); + this.breaker = breaker; + this.termsEnum = termsEnum; + this.estimator = estimator; + this.totalBytes = 0; + this.flushBuffer = 0; + } + + /** + * Always accept the term. + */ + @Override + protected AcceptStatus accept(BytesRef term) throws IOException { + return AcceptStatus.YES; + } + + /** + * Flush the {@code flushBuffer} to the breaker, incrementing the total + * bytes and resetting the buffer. + */ + public void flush() { + breaker.addEstimateBytesAndMaybeBreak(this.flushBuffer); + this.totalBytes += this.flushBuffer; + this.flushBuffer = 0; + } + + /** + * Proxy to the original next() call, but estimates the overhead of + * loading the next term. + */ + @Override + public BytesRef next() throws IOException { + BytesRef term = termsEnum.next(); + if (term == null && this.flushBuffer != 0) { + // We have reached the end of the termsEnum, flush the buffer + flush(); + } else { + this.flushBuffer += estimator.bytesPerValue(term); + if (this.flushBuffer >= FLUSH_BUFFER_SIZE) { + flush(); + } + } + return term; + } + + /** + * @return the total number of bytes that have been aggregated + */ + public long getTotalBytes() { + return this.totalBytes; + } +} diff --git a/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java index 7958927bacd..cca61e465b0 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java @@ -26,10 +26,13 @@ import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.fielddata.plain.PackedArrayAtomicFieldData; +import org.elasticsearch.index.fielddata.plain.PagedBytesAtomicFieldData; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -43,9 +46,12 @@ public class ShardFieldData extends AbstractIndexShardComponent implements Index final ConcurrentMap perFieldTotals = ConcurrentCollections.newConcurrentMap(); + private final CircuitBreakerService breakerService; + @Inject - public ShardFieldData(ShardId shardId, @IndexSettings Settings indexSettings) { + public ShardFieldData(ShardId shardId, @IndexSettings Settings indexSettings, CircuitBreakerService breakerService) { super(shardId, indexSettings); + this.breakerService = breakerService; } public FieldDataStats stats(String... fields) { @@ -89,6 +95,10 @@ public class ShardFieldData extends AbstractIndexShardComponent implements Index evictionsMetric.inc(); } if (sizeInBytes != -1) { + // Since field data is being unloaded (due to expiration or manual + // clearing), we also need to decrement the used bytes in the breaker + breakerService.getBreaker().addWithoutBreaking(-sizeInBytes); + totalMetric.dec(sizeInBytes); String keyFieldName = fieldNames.indexName(); diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java index 9f324ad5670..68474b0c1bd 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/DisabledIndexFieldData.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.fielddata.fieldcomparator.SortMode; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.FieldMapper.Names; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; /** * A field data implementation that forbids loading and will throw an {@link ElasticSearchIllegalStateException} if you try to load @@ -37,7 +38,9 @@ public final class DisabledIndexFieldData extends AbstractIndexFieldData> build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { + public IndexFieldData> build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, + IndexFieldDataCache cache, CircuitBreakerService breakerService) { + // Ignore Circuit Breaker return new DisabledIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache); } } diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java index 410b65c5cbd..75f213ed597 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/DocValuesIndexFieldData.java @@ -32,6 +32,7 @@ import org.elasticsearch.index.mapper.FieldMapper.Names; import org.elasticsearch.index.mapper.internal.IdFieldMapper; import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; import java.util.Map; import java.util.Set; @@ -77,7 +78,9 @@ public abstract class DocValuesIndexFieldData { } @Override - public IndexFieldData build(Index index, Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { + public IndexFieldData build(Index index, Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache, + CircuitBreakerService breakerService) { + // Ignore Circuit Breaker final FieldMapper.Names fieldNames = mapper.names(); final Settings fdSettings = mapper.fieldDataType().getSettings(); final Map filter = fdSettings.getGroups("filter"); diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java index 5cb5312660a..5a21035aed0 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/DoubleArrayIndexFieldData.java @@ -35,21 +35,27 @@ import org.elasticsearch.index.fielddata.ordinals.Ordinals.Docs; import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; /** */ public class DoubleArrayIndexFieldData extends AbstractIndexFieldData implements IndexNumericFieldData { + private final CircuitBreakerService breakerService; + public static class Builder implements IndexFieldData.Builder { @Override - public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { - return new DoubleArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache); + public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache, + CircuitBreakerService breakerService) { + return new DoubleArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, breakerService); } } - public DoubleArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache) { + public DoubleArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, + FieldDataType fieldDataType, IndexFieldDataCache cache, CircuitBreakerService breakerService) { super(index, indexSettings, fieldNames, fieldDataType, cache); + this.breakerService = breakerService; } @Override @@ -69,8 +75,13 @@ public class DoubleArrayIndexFieldData extends AbstractIndexFieldData { + private final CircuitBreakerService breakerService; + public static class Builder implements IndexFieldData.Builder { @Override - public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { - return new FSTBytesIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache); + public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, + IndexFieldDataCache cache, CircuitBreakerService breakerService) { + return new FSTBytesIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, breakerService); } } - FSTBytesIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache) { + FSTBytesIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, + IndexFieldDataCache cache, CircuitBreakerService breakerService) { super(index, indexSettings, fieldNames, fieldDataType, cache); + this.breakerService = breakerService; } @Override @@ -57,8 +63,13 @@ public class FSTBytesIndexFieldData extends AbstractBytesIndexFieldData fstBuilder = new org.apache.lucene.util.fst.Builder(INPUT_TYPE.BYTE1, outputs); @@ -72,6 +83,7 @@ public class FSTBytesIndexFieldData extends AbstractBytesIndexFieldData implements IndexNumericFieldData { + private final CircuitBreakerService breakerService; + public static class Builder implements IndexFieldData.Builder { @Override - public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { - return new FloatArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache); + public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache, + CircuitBreakerService breakerService) { + return new FloatArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, breakerService); } } - public FloatArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache) { + public FloatArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, + FieldDataType fieldDataType, IndexFieldDataCache cache, CircuitBreakerService breakerService) { super(index, indexSettings, fieldNames, fieldDataType, cache); + this.breakerService = breakerService; } @Override @@ -68,8 +74,13 @@ public class FloatArrayIndexFieldData extends AbstractIndexFieldData build(Index index, Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { + public IndexFieldData build(Index index, Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache, + CircuitBreakerService breakerService) { + // Ignore breaker final FieldMapper.Names fieldNames = mapper.names(); return new GeoPointBinaryDVIndexFieldData(index, fieldNames); } diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java index 601c27c1276..4c5e3d6607f 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointCompressedIndexFieldData.java @@ -38,6 +38,7 @@ import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.geo.GeoPointFieldMapper; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; /** */ @@ -45,11 +46,13 @@ public class GeoPointCompressedIndexFieldData extends AbstractGeoPointIndexField private static final String PRECISION_KEY = "precision"; private static final Distance DEFAULT_PRECISION_VALUE = new Distance(1, DistanceUnit.CENTIMETERS); + private final CircuitBreakerService breakerService; public static class Builder implements IndexFieldData.Builder { @Override - public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { + public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache, + CircuitBreakerService breakerService) { FieldDataType type = mapper.fieldDataType(); final String precisionAsString = type.getSettings().get(PRECISION_KEY); final Distance precision; @@ -58,15 +61,18 @@ public class GeoPointCompressedIndexFieldData extends AbstractGeoPointIndexField } else { precision = DEFAULT_PRECISION_VALUE; } - return new GeoPointCompressedIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, precision); + return new GeoPointCompressedIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, precision, breakerService); } } private final GeoPointFieldMapper.Encoding encoding; - public GeoPointCompressedIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache, Distance precision) { + public GeoPointCompressedIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, + FieldDataType fieldDataType, IndexFieldDataCache cache, Distance precision, + CircuitBreakerService breakerService) { super(index, indexSettings, fieldNames, fieldDataType, cache); this.encoding = GeoPointFieldMapper.Encoding.of(precision); + this.breakerService = breakerService; } @Override @@ -74,8 +80,13 @@ public class GeoPointCompressedIndexFieldData extends AbstractGeoPointIndexField AtomicReader reader = context.reader(); Terms terms = reader.terms(getFieldNames().indexName()); + AtomicGeoPointFieldData data = null; + // TODO: Use an actual estimator to estimate before loading. + NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker()); if (terms == null) { - return new Empty(reader.maxDoc()); + data = new Empty(reader.maxDoc()); + estimator.afterLoad(null, data.getMemorySizeInBytes()); + return data; } final long initialSize; if (terms.size() >= 0) { @@ -88,6 +99,7 @@ public class GeoPointCompressedIndexFieldData extends AbstractGeoPointIndexField PagedMutable lon = new PagedMutable(initialSize, pageSize, encoding.numBitsPerCoordinate(), PackedInts.COMPACT); final float acceptableTransientOverheadRatio = fieldDataType.getSettings().getAsFloat("acceptable_transient_overhead_ratio", OrdinalsBuilder.DEFAULT_ACCEPTABLE_OVERHEAD_RATIO); OrdinalsBuilder builder = new OrdinalsBuilder(terms.size(), reader.maxDoc(), acceptableTransientOverheadRatio); + boolean success = false; try { final GeoPointEnum iter = new GeoPointEnum(builder.buildFromTerms(terms.iterator(null))); GeoPoint point; @@ -116,18 +128,23 @@ public class GeoPointCompressedIndexFieldData extends AbstractGeoPointIndexField } FixedBitSet set = builder.buildDocsWithValuesSet(); if (set == null) { - return new GeoPointCompressedAtomicFieldData.Single(encoding, sLon, sLat, reader.maxDoc(), ordinals.getNumOrds()); + data = new GeoPointCompressedAtomicFieldData.Single(encoding, sLon, sLat, reader.maxDoc(), ordinals.getNumOrds()); } else { - return new GeoPointCompressedAtomicFieldData.SingleFixedSet(encoding, sLon, sLat, reader.maxDoc(), set, ordinals.getNumOrds()); + data = new GeoPointCompressedAtomicFieldData.SingleFixedSet(encoding, sLon, sLat, reader.maxDoc(), set, ordinals.getNumOrds()); } } else { if (lat.size() != build.getMaxOrd()) { lat = lat.resize(build.getMaxOrd()); lon = lon.resize(build.getMaxOrd()); } - return new GeoPointCompressedAtomicFieldData.WithOrdinals(encoding, lon, lat, reader.maxDoc(), build); + data = new GeoPointCompressedAtomicFieldData.WithOrdinals(encoding, lon, lat, reader.maxDoc(), build); } + success = true; + return data; } finally { + if (success) { + estimator.afterLoad(null, data.getMemorySizeInBytes()); + } builder.close(); } diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java index 4ac005f0580..d049f58cc55 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/GeoPointDoubleArrayIndexFieldData.java @@ -33,21 +33,27 @@ import org.elasticsearch.index.fielddata.ordinals.Ordinals.Docs; import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; /** */ public class GeoPointDoubleArrayIndexFieldData extends AbstractGeoPointIndexFieldData { + private final CircuitBreakerService breakerService; + public static class Builder implements IndexFieldData.Builder { @Override - public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { - return new GeoPointDoubleArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache); + public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache, + CircuitBreakerService breakerService) { + return new GeoPointDoubleArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, breakerService); } } - public GeoPointDoubleArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache) { + public GeoPointDoubleArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, + FieldDataType fieldDataType, IndexFieldDataCache cache, CircuitBreakerService breakerService) { super(index, indexSettings, fieldNames, fieldDataType, cache); + this.breakerService = breakerService; } @Override @@ -55,8 +61,13 @@ public class GeoPointDoubleArrayIndexFieldData extends AbstractGeoPointIndexFiel AtomicReader reader = context.reader(); Terms terms = reader.terms(getFieldNames().indexName()); + AtomicGeoPointFieldData data = null; + // TODO: Use an actual estimator to estimate before loading. + NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker()); if (terms == null) { - return new Empty(reader.maxDoc()); + data = new Empty(reader.maxDoc()); + estimator.afterLoad(null, data.getMemorySizeInBytes()); + return data; } final BigDoubleArrayList lat = new BigDoubleArrayList(); final BigDoubleArrayList lon = new BigDoubleArrayList(); @@ -64,6 +75,7 @@ public class GeoPointDoubleArrayIndexFieldData extends AbstractGeoPointIndexFiel lon.add(0); // first "t" indicates null value final float acceptableTransientOverheadRatio = fieldDataType.getSettings().getAsFloat("acceptable_transient_overhead_ratio", OrdinalsBuilder.DEFAULT_ACCEPTABLE_OVERHEAD_RATIO); OrdinalsBuilder builder = new OrdinalsBuilder(terms.size(), reader.maxDoc(), acceptableTransientOverheadRatio); + boolean success = false; try { final GeoPointEnum iter = new GeoPointEnum(builder.buildFromTerms(terms.iterator(null))); GeoPoint point; @@ -85,16 +97,19 @@ public class GeoPointDoubleArrayIndexFieldData extends AbstractGeoPointIndexFiel } FixedBitSet set = builder.buildDocsWithValuesSet(); if (set == null) { - return new GeoPointDoubleArrayAtomicFieldData.Single(sLon, sLat, reader.maxDoc(), ordinals.getNumOrds()); + data = new GeoPointDoubleArrayAtomicFieldData.Single(sLon, sLat, reader.maxDoc(), ordinals.getNumOrds()); } else { - return new GeoPointDoubleArrayAtomicFieldData.SingleFixedSet(sLon, sLat, reader.maxDoc(), set, ordinals.getNumOrds()); + data = new GeoPointDoubleArrayAtomicFieldData.SingleFixedSet(sLon, sLat, reader.maxDoc(), set, ordinals.getNumOrds()); } } else { - return new GeoPointDoubleArrayAtomicFieldData.WithOrdinals( - lon, lat, - reader.maxDoc(), build); + data = new GeoPointDoubleArrayAtomicFieldData.WithOrdinals(lon, lat, reader.maxDoc(), build); } + success = true; + return data; } finally { + if (success) { + estimator.afterLoad(null, data.getMemorySizeInBytes()); + } builder.close(); } diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java b/src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java new file mode 100644 index 00000000000..5a5d8fb4582 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/NonEstimatingEstimator.java @@ -0,0 +1,58 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.fielddata.plain; + +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.MemoryCircuitBreaker; +import org.elasticsearch.index.fielddata.AbstractIndexFieldData; + +import java.io.IOException; + +/** + * Estimator that does nothing except for adjust the breaker after the field + * data has been loaded. Useful for field data implementations that do not yet + * have pre-loading estimations. + */ +public class NonEstimatingEstimator implements AbstractIndexFieldData.PerValueEstimator { + + private final MemoryCircuitBreaker breaker; + + NonEstimatingEstimator(MemoryCircuitBreaker breaker) { + this.breaker = breaker; + } + + @Override + public long bytesPerValue(BytesRef term) { + return 0; + } + + @Override + public TermsEnum beforeLoad(Terms terms) throws IOException { + return null; + } + + @Override + public void afterLoad(@Nullable TermsEnum termsEnum, long actualUsed) { + breaker.addWithoutBreaking(actualUsed); + } +} diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java index 499cee8f5ad..caa5876a32d 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/PackedArrayIndexFieldData.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import org.apache.lucene.index.AtomicReader; import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; import org.apache.lucene.util.FixedBitSet; @@ -30,8 +31,10 @@ import org.apache.lucene.util.NumericUtils; import org.apache.lucene.util.packed.MonotonicAppendingLongBuffer; import org.apache.lucene.util.packed.PackedInts; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.MemoryCircuitBreaker; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; +import org.elasticsearch.index.fielddata.RamAccountingTermsEnum; import org.elasticsearch.index.fielddata.*; import org.elasticsearch.index.fielddata.fieldcomparator.LongValuesComparatorSource; import org.elasticsearch.index.fielddata.fieldcomparator.SortMode; @@ -40,7 +43,9 @@ import org.elasticsearch.index.fielddata.ordinals.Ordinals.Docs; import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import java.io.IOException; import java.util.EnumSet; /** @@ -58,18 +63,23 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { - return new PackedArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, numericType); + public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, + IndexFieldDataCache cache, CircuitBreakerService breakerService) { + return new PackedArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, numericType, breakerService); } } private final NumericType numericType; + private final CircuitBreakerService breakerService; - public PackedArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache, NumericType numericType) { + public PackedArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, + FieldDataType fieldDataType, IndexFieldDataCache cache, NumericType numericType, + CircuitBreakerService breakerService) { super(index, indexSettings, fieldNames, fieldDataType, cache); Preconditions.checkNotNull(numericType); Preconditions.checkArgument(EnumSet.of(NumericType.BYTE, NumericType.SHORT, NumericType.INT, NumericType.LONG).contains(numericType), getClass().getSimpleName() + " only supports integer types, not " + numericType); this.numericType = numericType; + this.breakerService = breakerService; } @Override @@ -88,8 +98,12 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData 32; @@ -156,28 +172,38 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData { + private final CircuitBreakerService breakerService; + public static class Builder implements IndexFieldData.Builder { @Override - public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, IndexFieldDataCache cache) { - return new PagedBytesIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache); + public IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper mapper, + IndexFieldDataCache cache, CircuitBreakerService breakerService) { + return new PagedBytesIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, breakerService); } } - public PagedBytesIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache) { + public PagedBytesIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, + FieldDataType fieldDataType, IndexFieldDataCache cache, CircuitBreakerService breakerService) { super(index, indexSettings, fieldNames, fieldDataType, cache); + this.breakerService = breakerService; } @Override public PagedBytesAtomicFieldData loadDirect(AtomicReaderContext context) throws Exception { AtomicReader reader = context.reader(); + PagedBytesEstimator estimator = new PagedBytesEstimator(context, breakerService.getBreaker()); Terms terms = reader.terms(getFieldNames().indexName()); if (terms == null) { - return PagedBytesAtomicFieldData.empty(reader.maxDoc()); + PagedBytesAtomicFieldData emptyData = PagedBytesAtomicFieldData.empty(reader.maxDoc()); + estimator.adjustForNoTerms(emptyData.getMemorySizeInBytes()); + return emptyData; } final PagedBytes bytes = new PagedBytes(15); @@ -68,12 +80,22 @@ public class PagedBytesIndexFieldData extends AbstractBytesIndexFieldData implements CircuitBreakerService { + + public static final String CIRCUIT_BREAKER_MAX_BYTES_SETTING = "indices.fielddata.breaker.limit"; + public static final String CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.fielddata.breaker.overhead"; + + public static final double DEFAULT_OVERHEAD_CONSTANT = 1.03; + + private volatile MemoryCircuitBreaker breaker; + private volatile long maxBytes; + private volatile double overhead; + + @Inject + public InternalCircuitBreakerService(Settings settings, NodeSettingsService nodeSettingsService, IndicesFieldDataCache fieldDataCache) { + super(settings); + long fieldDataMax = fieldDataCache.computeSizeInBytes(); + this.maxBytes = settings.getAsBytesSize(CIRCUIT_BREAKER_MAX_BYTES_SETTING, new ByteSizeValue(fieldDataMax)).bytes(); + this.overhead = settings.getAsDouble(CIRCUIT_BREAKER_OVERHEAD_SETTING, DEFAULT_OVERHEAD_CONSTANT); + + this.breaker = new MemoryCircuitBreaker(new ByteSizeValue(maxBytes), overhead, null, logger); + + nodeSettingsService.addListener(new ApplySettings()); + } + + class ApplySettings implements NodeSettingsService.Listener { + @Override + public void onRefreshSettings(Settings settings) { + // clear breaker now that settings have changed + ByteSizeValue newMaxByteSizeValue = settings.getAsBytesSize(CIRCUIT_BREAKER_MAX_BYTES_SETTING, null); + boolean breakerResetNeeded = false; + + if (newMaxByteSizeValue != null) { + logger.info("updating [{}] from [{}] to [{}]", CIRCUIT_BREAKER_MAX_BYTES_SETTING, + new ByteSizeValue(InternalCircuitBreakerService.this.maxBytes), newMaxByteSizeValue); + InternalCircuitBreakerService.this.maxBytes = newMaxByteSizeValue.bytes(); + breakerResetNeeded = true; + } + + double newOverhead = settings.getAsDouble(CIRCUIT_BREAKER_OVERHEAD_SETTING, overhead); + if (newOverhead != overhead) { + logger.info("updating [{}] from [{}] to [{}]", CIRCUIT_BREAKER_OVERHEAD_SETTING, + overhead, newOverhead); + InternalCircuitBreakerService.this.overhead = newOverhead; + breakerResetNeeded = true; + } + + if (breakerResetNeeded) { + resetBreaker(); + } + } + } + + /** + * @return a {@link org.elasticsearch.common.breaker.MemoryCircuitBreaker} that can be used for aggregating memory usage + */ + public MemoryCircuitBreaker getBreaker() { + return this.breaker; + } + + /** + * Reset the breaker, creating a new one and initializing its used value + * to the actual field data usage, or the existing estimated usage if the + * actual value is not available. Will not trip the breaker even if the + * used value is higher than the limit for the breaker. + */ + public synchronized void resetBreaker() { + final MemoryCircuitBreaker oldBreaker = this.breaker; + // discard old breaker by creating a new one and pre-populating from the current breaker + this.breaker = new MemoryCircuitBreaker(new ByteSizeValue(maxBytes), overhead, oldBreaker, logger); + } + + @Override + public FieldDataBreakerStats stats() { + return new FieldDataBreakerStats(breaker.getMaximum(), breaker.getUsed(), breaker.getOverhead()); + } + + @Override + protected void doStart() throws ElasticSearchException { + } + + @Override + protected void doStop() throws ElasticSearchException { + } + + @Override + protected void doClose() throws ElasticSearchException { + } +} diff --git a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java index 0b2d4d95a62..d97a8184bff 100644 --- a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java +++ b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java @@ -23,16 +23,14 @@ import com.google.common.cache.*; import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.index.SegmentReader; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.MemoryCircuitBreaker; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; -import org.elasticsearch.index.fielddata.AtomicFieldData; -import org.elasticsearch.index.fielddata.FieldDataType; -import org.elasticsearch.index.fielddata.IndexFieldData; -import org.elasticsearch.index.fielddata.IndexFieldDataCache; +import org.elasticsearch.index.fielddata.*; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.ShardId; @@ -47,6 +45,8 @@ import java.util.concurrent.TimeUnit; */ public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener { + private static final long JVM_HEAP_MAX_BYTES = JvmInfo.jvmInfo().getMem().getHeapMax().bytes(); + Cache cache; private volatile String size; @@ -59,7 +59,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL super(settings); this.size = componentSettings.get("size", "-1"); this.expire = componentSettings.getAsTime("expire", null); - computeSizeInBytes(); + this.sizeInBytes = computeSizeInBytes(); buildCache(); } @@ -78,14 +78,17 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL cache = cacheBuilder.build(); } - private void computeSizeInBytes() { + /** + * @return the maximum configured size for the field data cache, in bytes, or -1 if not set + */ + public long computeSizeInBytes() { if (size.equals("-1")) { - sizeInBytes = -1; + return -1; } else if (size.endsWith("%")) { double percent = Double.parseDouble(size.substring(0, size.length() - 1)); - sizeInBytes = (long) ((percent / 100) * JvmInfo.jvmInfo().getMem().getHeapMax().bytes()); + return (long) ((percent / 100) * JVM_HEAP_MAX_BYTES); } else { - sizeInBytes = ByteSizeValue.parseBytesSizeValue(size).bytes(); + return ByteSizeValue.parseBytesSizeValue(size).bytes(); } } diff --git a/src/main/java/org/elasticsearch/node/service/NodeService.java b/src/main/java/org/elasticsearch/node/service/NodeService.java index b0e95a5cbb4..ebfd752c65b 100644 --- a/src/main/java/org/elasticsearch/node/service/NodeService.java +++ b/src/main/java/org/elasticsearch/node/service/NodeService.java @@ -36,6 +36,7 @@ import org.elasticsearch.http.HttpServer; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -55,6 +56,8 @@ public class NodeService extends AbstractComponent { private final PluginsService pluginService; + private final CircuitBreakerService circuitBreakerService; + @Nullable private HttpServer httpServer; @@ -70,7 +73,7 @@ public class NodeService extends AbstractComponent { @Inject public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery, TransportService transportService, IndicesService indicesService, - PluginsService pluginService, Version version) { + PluginsService pluginService, CircuitBreakerService circuitBreakerService, Version version) { super(settings); this.threadPool = threadPool; this.monitorService = monitorService; @@ -84,6 +87,7 @@ public class NodeService extends AbstractComponent { } this.version = version; this.pluginService = pluginService; + this.circuitBreakerService = circuitBreakerService; } public void setHttpServer(@Nullable HttpServer httpServer) { @@ -156,11 +160,13 @@ public class NodeService extends AbstractComponent { monitorService.networkService().stats(), monitorService.fsService().stats(), transportService.stats(), - httpServer == null ? null : httpServer.stats() + httpServer == null ? null : httpServer.stats(), + circuitBreakerService.stats() ); } - public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool, boolean network, boolean fs, boolean transport, boolean http) { + public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool, boolean network, + boolean fs, boolean transport, boolean http, boolean circuitBreaker) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) return new NodeStats(disovery.localNode(), System.currentTimeMillis(), hostname, @@ -172,7 +178,8 @@ public class NodeService extends AbstractComponent { network ? monitorService.networkService().stats() : null, fs ? monitorService.fsService().stats() : null, transport ? transportService.stats() : null, - http ? (httpServer == null ? null : httpServer.stats()) : null + http ? (httpServer == null ? null : httpServer.stats()) : null, + circuitBreaker ? circuitBreakerService.stats() : null ); } } diff --git a/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/stats/RestNodesStatsAction.java b/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/stats/RestNodesStatsAction.java index 9535bca4c53..b4448582f5d 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/stats/RestNodesStatsAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/stats/RestNodesStatsAction.java @@ -116,6 +116,12 @@ public class RestNodesStatsAction extends BaseRestHandler { controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/stats/http", httpHandler); controller.registerHandler(RestRequest.Method.GET, "/_nodes/http/stats", httpHandler); controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/http/stats", httpHandler); + + RestBreakerHandler breakerHandler = new RestBreakerHandler(); + controller.registerHandler(RestRequest.Method.GET, "/_nodes/stats/breaker", breakerHandler); + controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/stats/breaker", breakerHandler); + controller.registerHandler(RestRequest.Method.GET, "/_nodes/breaker/stats", breakerHandler); + controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/breaker/stats", breakerHandler); } @Override @@ -141,6 +147,7 @@ public class RestNodesStatsAction extends BaseRestHandler { nodesStatsRequest.fs(request.paramAsBoolean("fs", nodesStatsRequest.fs())); nodesStatsRequest.transport(request.paramAsBoolean("transport", nodesStatsRequest.transport())); nodesStatsRequest.http(request.paramAsBoolean("http", nodesStatsRequest.http())); + nodesStatsRequest.breaker(request.paramAsBoolean("breaker", nodesStatsRequest.breaker())); executeNodeStats(request, channel, nodesStatsRequest); } @@ -264,4 +271,13 @@ public class RestNodesStatsAction extends BaseRestHandler { executeNodeStats(request, channel, nodesStatsRequest); } } + + class RestBreakerHandler implements RestHandler { + @Override + public void handleRequest(final RestRequest request, final RestChannel channel) { + NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(Strings.splitStringByCommaToArray(request.param("nodeId"))); + nodesStatsRequest.clear().breaker(true); + executeNodeStats(request, channel, nodesStatsRequest); + } + } } diff --git a/src/main/java/org/elasticsearch/search/SearchModule.java b/src/main/java/org/elasticsearch/search/SearchModule.java index 37c8ccec333..5f0c61fcf35 100644 --- a/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/src/main/java/org/elasticsearch/search/SearchModule.java @@ -24,6 +24,8 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.SpawnModules; import org.elasticsearch.index.query.functionscore.FunctionScoreModule; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService; import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.aggregations.AggregationModule; import org.elasticsearch.search.controller.SearchPhaseController; diff --git a/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java b/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java index 49002aff0b5..88c00f2e03b 100644 --- a/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java +++ b/src/test/java/org/elasticsearch/benchmark/fielddata/LongFieldDataBenchmark.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.fielddata.IndexNumericFieldData; import org.elasticsearch.index.mapper.ContentPath; import org.elasticsearch.index.mapper.Mapper.BuilderContext; import org.elasticsearch.index.mapper.core.LongFieldMapper; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import java.util.Random; @@ -135,7 +136,7 @@ public class LongFieldDataBenchmark { indexWriter.close(); final DirectoryReader dr = DirectoryReader.open(dir); - final IndexFieldDataService fds = new IndexFieldDataService(new Index("dummy")); + final IndexFieldDataService fds = new IndexFieldDataService(new Index("dummy"), new DummyCircuitBreakerService()); final LongFieldMapper mapper = new LongFieldMapper.Builder(fieldName).build(new BuilderContext(null, new ContentPath(1))); final IndexNumericFieldData fd = fds.getForField(mapper); final long start = System.nanoTime(); diff --git a/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java b/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java new file mode 100644 index 00000000000..07647ee2123 --- /dev/null +++ b/src/test/java/org/elasticsearch/common/breaker/MemoryCircuitBreakerTests.java @@ -0,0 +1,108 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.breaker; + +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests for the Memory Aggregating Circuit Breaker + */ +public class MemoryCircuitBreakerTests extends ElasticsearchTestCase { + + @Test + public void testThreadedUpdatesToBreaker() throws Exception { + final int NUM_THREADS = 5; + final int BYTES_PER_THREAD = 1000; + final Thread[] threads = new Thread[NUM_THREADS]; + final AtomicBoolean tripped = new AtomicBoolean(false); + final AtomicReference lastException = new AtomicReference(null); + + final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue((BYTES_PER_THREAD * NUM_THREADS) - 1), 1.0, logger); + + for (int i = 0; i < NUM_THREADS; i++) { + threads[i] = new Thread(new Runnable() { + @Override + public void run() { + for (int j = 0; j < BYTES_PER_THREAD; j++) { + try { + breaker.addEstimateBytesAndMaybeBreak(1L); + } catch (CircuitBreakingException e) { + if (tripped.get()) { + assertThat("tripped too many times", true, equalTo(false)); + } else { + assert tripped.compareAndSet(false, true); + } + } catch (Throwable e2) { + lastException.set(e2); + } + } + } + }); + + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + assertThat("no other exceptions were thrown", lastException.get(), equalTo(null)); + assertThat("breaker was tripped exactly once", tripped.get(), equalTo(true)); + } + + @Test + public void testConstantFactor() throws Exception { + final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue(15), 1.6, logger); + + // add only 7 bytes + breaker.addWithoutBreaking(7); + + try { + // this won't actually add it because it trips the breaker + breaker.addEstimateBytesAndMaybeBreak(3); + fail("should never reach this"); + } catch (CircuitBreakingException cbe) { + assert true; + } + + // shouldn't throw an exception + breaker.addEstimateBytesAndMaybeBreak(2); + + assertThat(breaker.getUsed(), equalTo(9L)); + + // adding 3 more bytes (now at 12) + breaker.addWithoutBreaking(3); + + try { + // Adding no bytes still breaks + breaker.addEstimateBytesAndMaybeBreak(0); + fail("should never reach this"); + } catch (CircuitBreakingException cbe) { + assert true; + } + } +} diff --git a/src/test/java/org/elasticsearch/index/aliases/IndexAliasesServiceTests.java b/src/test/java/org/elasticsearch/index/aliases/IndexAliasesServiceTests.java index 1a4303ad13f..135ffa20bf7 100644 --- a/src/test/java/org/elasticsearch/index/aliases/IndexAliasesServiceTests.java +++ b/src/test/java/org/elasticsearch/index/aliases/IndexAliasesServiceTests.java @@ -46,6 +46,8 @@ import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import org.junit.Test; @@ -83,6 +85,7 @@ public class IndexAliasesServiceTests extends ElasticsearchTestCase { @Override protected void configure() { bind(ClusterService.class).toProvider(Providers.of((ClusterService) null)); + bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class); } } ).createInjector(); diff --git a/src/test/java/org/elasticsearch/index/codec/CodecTests.java b/src/test/java/org/elasticsearch/index/codec/CodecTests.java index a3b374c942b..c0a01fcc796 100644 --- a/src/test/java/org/elasticsearch/index/codec/CodecTests.java +++ b/src/test/java/org/elasticsearch/index/codec/CodecTests.java @@ -34,6 +34,7 @@ import org.apache.lucene.codecs.memory.MemoryPostingsFormat; import org.apache.lucene.codecs.perfield.PerFieldPostingsFormat; import org.apache.lucene.codecs.pulsing.Pulsing41PostingsFormat; import org.apache.lucene.codecs.simpletext.SimpleTextCodec; +import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.settings.ImmutableSettings; @@ -52,6 +53,8 @@ import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.internal.VersionFieldMapper; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.test.ElasticsearchLuceneTestCase; import org.junit.Before; import org.junit.Test; @@ -414,6 +417,12 @@ public class CodecTests extends ElasticsearchLuceneTestCase { .add(new CodecModule(settings)) .add(new MapperServiceModule()) .add(new AnalysisModule(settings)) + .add(new AbstractModule() { + @Override + protected void configure() { + bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class); + } + }) .createInjector(); return injector.getInstance(CodecService.class); } diff --git a/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTests.java b/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTests.java index 6e71a9e524a..1a067c963cc 100644 --- a/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTests.java +++ b/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.mapper.ContentPath; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.Mapper.BuilderContext; import org.elasticsearch.index.mapper.MapperBuilders; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import org.junit.After; import org.junit.Before; @@ -76,7 +77,7 @@ public abstract class AbstractFieldDataTests extends ElasticsearchTestCase { @Before public void setup() throws Exception { - ifdService = new IndexFieldDataService(new Index("test")); + ifdService = new IndexFieldDataService(new Index("test"), new DummyCircuitBreakerService()); // LogByteSizeMP to preserve doc ID order writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(Lucene.VERSION, new StandardAnalyzer(Lucene.VERSION)).setMergePolicy(new LogByteSizeMergePolicy())); } diff --git a/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java b/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java index 1e12bfbd054..616d226a716 100644 --- a/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java +++ b/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.Mapper.BuilderContext; import org.elasticsearch.index.mapper.MapperBuilders; import org.elasticsearch.index.mapper.core.*; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import java.util.Arrays; @@ -49,7 +50,7 @@ public class IndexFieldDataServiceTests extends ElasticsearchTestCase { @SuppressWarnings("unchecked") public void testGetForFieldDefaults() { - final IndexFieldDataService ifdService = new IndexFieldDataService(new Index("test")); + final IndexFieldDataService ifdService = new IndexFieldDataService(new Index("test"), new DummyCircuitBreakerService()); for (boolean docValues : Arrays.asList(true, false)) { final BuilderContext ctx = new BuilderContext(null, new ContentPath(1)); final StringFieldMapper stringMapper = new StringFieldMapper.Builder("string").tokenized(false).fieldDataSettings(docValues ? DOC_VALUES_SETTINGS : ImmutableSettings.EMPTY).build(ctx); @@ -98,7 +99,7 @@ public class IndexFieldDataServiceTests extends ElasticsearchTestCase { @SuppressWarnings("unchecked") public void testByPassDocValues() { - final IndexFieldDataService ifdService = new IndexFieldDataService(new Index("test")); + final IndexFieldDataService ifdService = new IndexFieldDataService(new Index("test"), new DummyCircuitBreakerService()); final BuilderContext ctx = new BuilderContext(null, new ContentPath(1)); final StringFieldMapper stringMapper = MapperBuilders.stringField("string").tokenized(false).fieldDataSettings(DOC_VALUES_SETTINGS).fieldDataSettings(ImmutableSettings.builder().put("format", "fst").build()).build(ctx); ifdService.clear(); @@ -129,7 +130,7 @@ public class IndexFieldDataServiceTests extends ElasticsearchTestCase { } public void testChangeFieldDataFormat() throws Exception { - final IndexFieldDataService ifdService = new IndexFieldDataService(new Index("test")); + final IndexFieldDataService ifdService = new IndexFieldDataService(new Index("test"), new DummyCircuitBreakerService()); final BuilderContext ctx = new BuilderContext(null, new ContentPath(1)); final StringFieldMapper mapper1 = MapperBuilders.stringField("s").tokenized(false).fieldDataSettings(ImmutableSettings.builder().put(FieldDataType.FORMAT_KEY, "paged_bytes").build()).build(ctx); final IndexWriter writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(TEST_VERSION_CURRENT, new KeywordAnalyzer())); diff --git a/src/test/java/org/elasticsearch/index/mapper/MapperTestUtils.java b/src/test/java/org/elasticsearch/index/mapper/MapperTestUtils.java index 786c00f2a28..4cd591096c8 100644 --- a/src/test/java/org/elasticsearch/index/mapper/MapperTestUtils.java +++ b/src/test/java/org/elasticsearch/index/mapper/MapperTestUtils.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityLookupService; import org.elasticsearch.indices.analysis.IndicesAnalysisModule; import org.elasticsearch.indices.analysis.IndicesAnalysisService; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; /** * @@ -58,7 +59,7 @@ public class MapperTestUtils { } public static MapperService newMapperService(Index index, Settings indexSettings) { - return new MapperService(index, indexSettings, new Environment(), newAnalysisService(), new IndexFieldDataService(index), + return new MapperService(index, indexSettings, new Environment(), newAnalysisService(), new IndexFieldDataService(index, new DummyCircuitBreakerService()), new PostingsFormatService(index), new DocValuesFormatService(index), newSimilarityLookupService()); } diff --git a/src/test/java/org/elasticsearch/index/query/SimpleIndexQueryParserTests.java b/src/test/java/org/elasticsearch/index/query/SimpleIndexQueryParserTests.java index bd9ac9cad2e..8ed2cdfa18d 100644 --- a/src/test/java/org/elasticsearch/index/query/SimpleIndexQueryParserTests.java +++ b/src/test/java/org/elasticsearch/index/query/SimpleIndexQueryParserTests.java @@ -61,6 +61,8 @@ import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolModule; @@ -118,6 +120,7 @@ public class SimpleIndexQueryParserTests extends ElasticsearchTestCase { @Override protected void configure() { bind(ClusterService.class).toProvider(Providers.of((ClusterService) null)); + bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class); } } ).createInjector(); diff --git a/src/test/java/org/elasticsearch/index/query/guice/IndexQueryParserModuleTests.java b/src/test/java/org/elasticsearch/index/query/guice/IndexQueryParserModuleTests.java index 3d491554d5d..218c0161c48 100644 --- a/src/test/java/org/elasticsearch/index/query/guice/IndexQueryParserModuleTests.java +++ b/src/test/java/org/elasticsearch/index/query/guice/IndexQueryParserModuleTests.java @@ -40,6 +40,8 @@ import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolModule; @@ -84,6 +86,7 @@ public class IndexQueryParserModuleTests extends ElasticsearchTestCase { @Override protected void configure() { bind(ClusterService.class).toProvider(Providers.of((ClusterService) null)); + bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class); } } ).createInjector(); diff --git a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java index 8eedbd7061b..726c49408b5 100644 --- a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java +++ b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java @@ -41,6 +41,8 @@ import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolModule; @@ -82,6 +84,7 @@ public class IndexQueryParserPlugin2Tests extends ElasticsearchTestCase { @Override protected void configure() { bind(ClusterService.class).toProvider(Providers.of((ClusterService) null)); + bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class); } } ).createInjector(); diff --git a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java index ab1649a8fad..b6aa8a6290f 100644 --- a/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java +++ b/src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java @@ -41,12 +41,13 @@ import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolModule; import org.junit.Test; -import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; /** @@ -91,6 +92,7 @@ public class IndexQueryParserPluginTests extends ElasticsearchTestCase { @Override protected void configure() { bind(ClusterService.class).toProvider(Providers.of((ClusterService) null)); + bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class); } } ).createInjector(); diff --git a/src/test/java/org/elasticsearch/index/search/FieldDataTermsFilterTests.java b/src/test/java/org/elasticsearch/index/search/FieldDataTermsFilterTests.java index 7bb7e31fff4..1890e091d03 100644 --- a/src/test/java/org/elasticsearch/index/search/FieldDataTermsFilterTests.java +++ b/src/test/java/org/elasticsearch/index/search/FieldDataTermsFilterTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.index.mapper.core.DoubleFieldMapper; import org.elasticsearch.index.mapper.core.LongFieldMapper; import org.elasticsearch.index.mapper.core.NumberFieldMapper; import org.elasticsearch.index.mapper.core.StringFieldMapper; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.test.ElasticsearchTestCase; import org.junit.After; @@ -78,7 +79,7 @@ public class FieldDataTermsFilterTests extends ElasticsearchTestCase { .build(new Mapper.BuilderContext(null, new ContentPath(1))); // create index and fielddata service - ifdService = new IndexFieldDataService(new Index("test")); + ifdService = new IndexFieldDataService(new Index("test"), new DummyCircuitBreakerService()); writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(Lucene.VERSION, new StandardAnalyzer(Lucene.VERSION))); diff --git a/src/test/java/org/elasticsearch/index/similarity/SimilarityTests.java b/src/test/java/org/elasticsearch/index/similarity/SimilarityTests.java index 92d74456b96..474d312de93 100644 --- a/src/test/java/org/elasticsearch/index/similarity/SimilarityTests.java +++ b/src/test/java/org/elasticsearch/index/similarity/SimilarityTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.similarity; import org.apache.lucene.search.similarities.*; +import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.settings.ImmutableSettings; @@ -33,6 +34,8 @@ import org.elasticsearch.index.codec.CodecModule; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperServiceModule; import org.elasticsearch.index.settings.IndexSettingsModule; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; +import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService; import org.elasticsearch.test.ElasticsearchTestCase; import org.junit.Test; @@ -40,7 +43,6 @@ import java.io.IOException; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.MatcherAssert.assertThat; public class SimilarityTests extends ElasticsearchTestCase { @@ -163,6 +165,12 @@ public class SimilarityTests extends ElasticsearchTestCase { .add(new MapperServiceModule()) .add(new AnalysisModule(settings)) .add(new SimilarityModule(settings)) + .add(new AbstractModule() { + @Override + protected void configure() { + bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class); + } + }) .createInjector(); return injector.getInstance(SimilarityService.class); } diff --git a/src/test/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerServiceTests.java b/src/test/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerServiceTests.java new file mode 100644 index 00000000000..72c86563c0a --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/fielddata/breaker/CircuitBreakerServiceTests.java @@ -0,0 +1,148 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.fielddata.breaker; + +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.junit.Test; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +/** + * Integration tests for InternalCircuitBreakerService + */ +public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest { + + @Test + @TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") + public void testMemoryBreaker() { + assertAcked(prepareCreate("cb-test", 1)); + final Client client = client(); + + try { + + // index some different terms so we have some field data for loading + int docCount = atLeast(300); + for (long id = 0; id < docCount; id++) { + client.prepareIndex("cb-test", "type", Long.toString(id)) + .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); + } + + // refresh + refresh(); + + // execute a search that loads field data (sorting on the "test" field) + client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}") + .execute().actionGet(); + + // clear field data cache (thus setting the loaded field data back to 0) + client.admin().indices().prepareClearCache("cb-test").setFieldDataCache(true).execute().actionGet(); + + // Update circuit breaker settings + Settings settings = settingsBuilder() + .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "100b") + .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet(); + + // execute a search that loads field data (sorting on the "test" field) + // again, this time it should trip the breaker + try { + client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}") + .execute().actionGet(); + fail("should not reach this point"); + } catch (SearchPhaseExecutionException e) { + assert true; + } + + } finally { + // Reset settings + Settings resetSettings = settingsBuilder() + .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "-1") + .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, InternalCircuitBreakerService.DEFAULT_OVERHEAD_CONSTANT) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + } + } + + @Test + @TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") + public void testRamAccountingTermsEnum() { + final Client client = client(); + + try { + + // Create an index where the mappings have a field data filter + client.admin().indices().prepareCreate("ramtest").setSource("{\"mappings\": {\"type\": {\"properties\": {\"test\": " + + "{\"type\": \"string\",\"fielddata\": {\"filter\": {\"regex\": {\"pattern\": \"^value.*\"}}}}}}}}").execute().actionGet(); + + // Wait 10 seconds for green + client.admin().cluster().prepareHealth("ramtest").setWaitForGreenStatus().setTimeout("10s").execute().actionGet(); + + // index some different terms so we have some field data for loading + int docCount = atLeast(300); + for (long id = 0; id < docCount; id++) { + client.prepareIndex("ramtest", "type", Long.toString(id)) + .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); + } + + // refresh + refresh(); + + // execute a search that loads field data (sorting on the "test" field) + client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}") + .execute().actionGet(); + + // clear field data cache (thus setting the loaded field data back to 0) + client.admin().indices().prepareClearCache("ramtest").setFieldDataCache(true).execute().actionGet(); + + // Update circuit breaker settings + Settings settings = settingsBuilder() + .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "100b") + .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet(); + + // execute a search that loads field data (sorting on the "test" field) + // again, this time it should trip the breaker + try { + client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}") + .execute().actionGet(); + fail("should not reach this point"); + } catch (SearchPhaseExecutionException e) { + assert true; + } + + } finally { + // Reset settings + Settings resetSettings = settingsBuilder() + .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "-1") + .put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, InternalCircuitBreakerService.DEFAULT_OVERHEAD_CONSTANT) + .build(); + client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet(); + } + } +} diff --git a/src/test/java/org/elasticsearch/indices/fielddata/breaker/DummyCircuitBreakerService.java b/src/test/java/org/elasticsearch/indices/fielddata/breaker/DummyCircuitBreakerService.java new file mode 100644 index 00000000000..62345d1d0af --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/fielddata/breaker/DummyCircuitBreakerService.java @@ -0,0 +1,47 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.fielddata.breaker; + +import org.elasticsearch.common.breaker.MemoryCircuitBreaker; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.unit.ByteSizeValue; + +/** + * Class that returns a breaker that never breaks + */ +public class DummyCircuitBreakerService implements CircuitBreakerService { + + private final ESLogger logger = Loggers.getLogger(DummyCircuitBreakerService.class); + + private final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue(Long.MAX_VALUE), 0.0, logger); + + public DummyCircuitBreakerService() {} + + @Override + public MemoryCircuitBreaker getBreaker() { + return breaker; + } + + @Override + public FieldDataBreakerStats stats() { + return new FieldDataBreakerStats(-1, -1, 0); + } +} diff --git a/src/test/java/org/elasticsearch/indices/fielddata/breaker/RandomExceptionCircuitBreakerTests.java b/src/test/java/org/elasticsearch/indices/fielddata/breaker/RandomExceptionCircuitBreakerTests.java new file mode 100644 index 00000000000..219a96a234e --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/fielddata/breaker/RandomExceptionCircuitBreakerTests.java @@ -0,0 +1,235 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.fielddata.breaker; + +import org.apache.lucene.index.AtomicReader; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.util.English; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.engine.MockRobinEngine; +import org.elasticsearch.test.engine.ThrowingAtomicReaderWrapper; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.store.MockDirectoryHelper; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests for the circuit breaker while random exceptions are happening + */ +public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegrationTest { + + @Test + @TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE") + public void testBreakerWithRandomExceptions() throws IOException, InterruptedException, ExecutionException { + final int numShards = between(1, 5); + String mapping = XContentFactory.jsonBuilder() + .startObject() + .startObject("type") + .startObject("properties") + .startObject("test-str") + .field("type", "string") + .field("index", "not_analyzed") + .startObject("fielddata") + .field("format", randomBytesFieldDataFormat()) + .endObject() // fielddata + .endObject() // test-str + .startObject("test-num") + // I don't use randomNumericType() here because I don't want "byte", and I want "float" and "double" + .field("type", randomFrom(Arrays.asList("float", "long", "double", "short", "integer"))) + .startObject("fielddata") + .field("format", randomNumericFieldDataFormat()) + .endObject() // fielddata + .endObject() // test-num + .endObject() // properties + .endObject() // type + .endObject() // {} + .string(); + final double exceptionRate; + final double exceptionOnOpenRate; + if (frequently()) { + if (randomBoolean()) { + if (randomBoolean()) { + exceptionOnOpenRate = 1.0/between(5, 100); + exceptionRate = 0.0d; + } else { + exceptionRate = 1.0/between(5, 100); + exceptionOnOpenRate = 0.0d; + } + } else { + exceptionOnOpenRate = 1.0/between(5, 100); + exceptionRate = 1.0/between(5, 100); + } + } else { + // rarely no exception + exceptionRate = 0d; + exceptionOnOpenRate = 0d; + } + + ImmutableSettings.Builder settings = settingsBuilder() + .put("index.number_of_shards", numShards) + .put("index.number_of_replicas", randomIntBetween(0, 1)) + .put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE, exceptionRate) + .put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE_ON_OPEN, exceptionOnOpenRate) + .put(MockDirectoryHelper.CHECK_INDEX_ON_CLOSE, true); + logger.info("creating index: [test] using settings: [{}]", settings.build().getAsMap()); + client().admin().indices().prepareCreate("test") + .setSettings(settings) + .addMapping("type", mapping).execute().actionGet(); + ClusterHealthResponse clusterHealthResponse = client().admin().cluster() + .health(Requests.clusterHealthRequest().waitForYellowStatus().timeout(TimeValue.timeValueSeconds(5))).get(); // it's OK to timeout here + final int numDocs; + if (clusterHealthResponse.isTimedOut()) { + /* some seeds just won't let you create the index at all and we enter a ping-pong mode + * trying one node after another etc. that is ok but we need to make sure we don't wait + * forever when indexing documents so we set numDocs = 1 and expect all shards to fail + * when we search below.*/ + logger.info("ClusterHealth timed out - only index one doc and expect searches to fail"); + numDocs = 1; + } else { + numDocs = between(10, 100); + } + for (int i = 0; i < numDocs ; i++) { + try { + client().prepareIndex("test", "type", "" + i) + .setTimeout(TimeValue.timeValueSeconds(1)).setSource("test-str", randomUnicodeOfLengthBetween(5, 25), "test-num", i).get(); + } catch (ElasticSearchException ex) { + } + } + logger.info("Start Refresh"); + RefreshResponse refreshResponse = client().admin().indices().prepareRefresh("test").execute().get(); // don't assert on failures here + final boolean refreshFailed = refreshResponse.getShardFailures().length != 0 || refreshResponse.getFailedShards() != 0; + logger.info("Refresh failed: [{}] numShardsFailed: [{}], shardFailuresLength: [{}], successfulShards: [{}], totalShards: [{}] ", + refreshFailed, refreshResponse.getFailedShards(), refreshResponse.getShardFailures().length, + refreshResponse.getSuccessfulShards(), refreshResponse.getTotalShards()); + final int numSearches = atLeast(10); + + for (int i = 0; i < numSearches; i++) { + try { + // Sort by the string and numeric fields, to load them into field data + client().prepareSearch().setQuery(QueryBuilders.matchAllQuery()) + .addSort("test-str", SortOrder.ASC) + .addSort("test-num", SortOrder.ASC).get(); + } catch (SearchPhaseExecutionException ex) { + logger.info("expected SearchPhaseException: [{}]", ex.getMessage()); + } + + if (frequently()) { + // Now, clear the cache and check that the circuit breaker has been + // successfully set back to zero. If there is a bug in the circuit + // breaker adjustment code, it should show up here by the breaker + // estimate being either positive or negative. + client().admin().indices().prepareClearCache("test").setFieldDataCache(true).execute().actionGet(); + NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().all().execute().actionGet(); + for (NodeStats stats : resp.getNodes()) { + assertThat("Breaker reset to 0", stats.getBreaker().getEstimated(), equalTo(0L)); + } + } + } + } + + public static final String EXCEPTION_TOP_LEVEL_RATIO_KEY = "index.engine.exception.ratio.top"; + public static final String EXCEPTION_LOW_LEVEL_RATIO_KEY = "index.engine.exception.ratio.low"; + + // TODO: Generalize this class and add it as a utility + public static class RandomExceptionDirectoryReaderWrapper extends MockRobinEngine.DirectoryReaderWrapper { + private final Settings settings; + static class ThrowingSubReaderWrapper extends SubReaderWrapper implements ThrowingAtomicReaderWrapper.Thrower { + private final Random random; + private final double topLevelRatio; + private final double lowLevelRatio; + + ThrowingSubReaderWrapper(Settings settings) { + final long seed = settings.getAsLong(ElasticsearchIntegrationTest.INDEX_SEED_SETTING, 0l); + this.topLevelRatio = settings.getAsDouble(EXCEPTION_TOP_LEVEL_RATIO_KEY, 0.1d); + this.lowLevelRatio = settings.getAsDouble(EXCEPTION_LOW_LEVEL_RATIO_KEY, 0.1d); + this.random = new Random(seed); + } + + @Override + public AtomicReader wrap(AtomicReader reader) { + return new ThrowingAtomicReaderWrapper(reader, this); + } + + @Override + public void maybeThrow(ThrowingAtomicReaderWrapper.Flags flag) throws IOException { + switch (flag) { + case Fields: + break; + case TermVectors: + break; + case Terms: + case TermsEnum: + if (random.nextDouble() < topLevelRatio) { + throw new IOException("Forced top level Exception on [" + flag.name() + "]"); + } + case Intersect: + break; + case Norms: + break; + case NumericDocValues: + break; + case BinaryDocValues: + break; + case SortedDocValues: + break; + case SortedSetDocValues: + break; + case DocsEnum: + case DocsAndPositionsEnum: + if (random.nextDouble() < lowLevelRatio) { + throw new IOException("Forced low level Exception on [" + flag.name() + "]"); + } + break; + } + + } + } + + public RandomExceptionDirectoryReaderWrapper(DirectoryReader in, Settings settings) { + super(in, new ThrowingSubReaderWrapper(settings)); + this.settings = settings; + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) { + return new RandomExceptionDirectoryReaderWrapper(in, settings); + } + } +} diff --git a/src/test/java/org/elasticsearch/search/geo/GeoDistanceTests.java b/src/test/java/org/elasticsearch/search/geo/GeoDistanceTests.java index 0f0bd902e63..e0dd2829a17 100644 --- a/src/test/java/org/elasticsearch/search/geo/GeoDistanceTests.java +++ b/src/test/java/org/elasticsearch/search/geo/GeoDistanceTests.java @@ -52,15 +52,11 @@ import static org.hamcrest.Matchers.*; */ public class GeoDistanceTests extends ElasticsearchIntegrationTest { - private static String randomFieldDataFormat() { - return randomFrom(Arrays.asList("array", "compressed", "doc_values")); - } - @Test public void simpleDistanceTests() throws Exception { String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1") .startObject("properties").startObject("location").field("type", "geo_point").field("lat_lon", true) - .startObject("fielddata").field("format", randomFieldDataFormat()).endObject().endObject().endObject() + .startObject("fielddata").field("format", randomNumericFieldDataFormat()).endObject().endObject().endObject() .endObject().endObject().string(); client().admin().indices().prepareCreate("test").addMapping("type1", mapping).execute().actionGet(); client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); @@ -212,7 +208,7 @@ public class GeoDistanceTests extends ElasticsearchIntegrationTest { public void testDistanceSortingMVFields() throws Exception { String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1") .startObject("properties").startObject("locations").field("type", "geo_point").field("lat_lon", true) - .startObject("fielddata").field("format", randomFieldDataFormat()).endObject().endObject().endObject() + .startObject("fielddata").field("format", randomNumericFieldDataFormat()).endObject().endObject().endObject() .endObject().endObject().string(); client().admin().indices().prepareCreate("test") @@ -344,7 +340,7 @@ public class GeoDistanceTests extends ElasticsearchIntegrationTest { public void testDistanceSortingWithMissingGeoPoint() throws Exception { String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1") .startObject("properties").startObject("locations").field("type", "geo_point").field("lat_lon", true) - .startObject("fielddata").field("format", randomFieldDataFormat()).endObject().endObject().endObject() + .startObject("fielddata").field("format", randomNumericFieldDataFormat()).endObject().endObject().endObject() .endObject().endObject().string(); client().admin().indices().prepareCreate("test") @@ -446,7 +442,7 @@ public class GeoDistanceTests extends ElasticsearchIntegrationTest { .startObject("properties") .startObject("name").field("type", "string").endObject() .startObject("location").field("type", "geo_point").field("lat_lon", true) - .startObject("fielddata").field("format", randomFieldDataFormat()).endObject().endObject() + .startObject("fielddata").field("format", randomNumericFieldDataFormat()).endObject().endObject() .endObject() .endObject() .endObject() @@ -621,7 +617,7 @@ public class GeoDistanceTests extends ElasticsearchIntegrationTest { .field("geohash_precision", 24) .field("lat_lon", true) .startObject("fielddata") - .field("format", randomFieldDataFormat()) + .field("format", randomNumericFieldDataFormat()) .endObject() .endObject() .endObject() diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 1473fa57cc6..504e7a5bead 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -880,4 +880,20 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase return perTestRatio; } + /** + * Returns a random numeric field data format from the choices of "array", + * "compressed", or "doc_values". + */ + public static String randomNumericFieldDataFormat() { + return randomFrom(Arrays.asList("array", "compressed", "doc_values")); + } + + /** + * Returns a random bytes field data format from the choices of + * "paged_bytes", "fst", or "doc_values". + */ + public static String randomBytesFieldDataFormat() { + return randomFrom(Arrays.asList("paged_bytes", "fst", "doc_values")); + } + }