Add field data memory circuit breaker.

This adds the field data circuit breaker, which is used to estimate
the amount of memory required to load field data before loading it. It
then raises a CircuitBreakingException if the limit is exceeded.

It is configured with two parameters:

`indices.fielddata.cache.breaker.limit` - the maximum number of bytes
of field data to be loaded before circuit breaking. Defaults to
`indices.fielddata.cache.size` if set, unbounded otherwise.

`indices.fielddata.cache.breaker.overhead` - a contast for all field
data estimations to be multiplied with before aggregation. Defaults to
1.03.

Both settings can be configured dynamically using the cluster update
settings API.
This commit is contained in:
Lee Hinman 2014-01-02 15:04:47 -07:00
parent 84565c2951
commit a754224751
51 changed files with 1835 additions and 123 deletions

View File

@ -23,7 +23,7 @@ second command selectively retrieves nodes stats of only `nodeId1` and
<<cluster-nodes,here>>.
By default, `indices` stats are returned. With options for `indices`,
`os`, `process`, `jvm`, `network`, `transport`, `http`, `fs`, and
`os`, `process`, `jvm`, `network`, `transport`, `http`, `fs`, `breaker`, and
`thread_pool`. For example:
[horizontal]
@ -60,6 +60,9 @@ By default, `indices` stats are returned. With options for `indices`,
Transport statistics about sent and received bytes in
cluster communication
`breaker`::
Statistics about the field data circuit breaker
`clear`::
Clears all the flags (first). Useful, if you only want to
retrieve specific stats.

View File

@ -237,9 +237,34 @@ The `frequency` and `regex` filters can be combined:
}
--------------------------------------------------
[float]
[[field-data-circuit-breaker]]
=== Field data circuit breaker
The field data circuit breaker allows Elasticsearch to estimate the amount of
memory a field will required to be loaded into memory. It can then prevent the
field data loading by raising and exception. By default it is configured with
no limit (-1 bytes), but is automatically set to `indices.fielddata.cache.size`
if set. It can be configured with the following parameters:
[cols="<,<",options="header",]
|=======================================================================
|Setting |Description
|`indices.fielddata.breaker.limit` |Maximum size of estimated field data
to allow loading. Defaults to `indices.fielddata.cache.size` if set, unbounded
if not.
|`indices.fielddata.breaker.overhead` |A constant that all field data
estimations are multiplied with to determine a final estimation. Defaults to
1.03
|=======================================================================
Both the `indices.fielddata.breaker.limit` and
`indices.fielddata.breaker.overhead` can be changed dynamically using the
cluster update settings API.
[float]
[[field-data-monitoring]]
=== Monitoring field data
You can monitor memory usage for field data using
You can monitor memory usage for field data as well as the field data circuit
breaker using
<<cluster-nodes-stats,Nodes Stats API>>

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.indices.NodeIndicesStats;
import org.elasticsearch.indices.fielddata.breaker.FieldDataBreakerStats;
import org.elasticsearch.monitor.fs.FsStats;
import org.elasticsearch.monitor.jvm.JvmStats;
import org.elasticsearch.monitor.network.NetworkStats;
@ -76,12 +77,16 @@ public class NodeStats extends NodeOperationResponse implements ToXContent {
@Nullable
private HttpStats http;
@Nullable
private FieldDataBreakerStats breaker;
NodeStats() {
}
public NodeStats(DiscoveryNode node, long timestamp, @Nullable String hostname, @Nullable NodeIndicesStats indices,
@Nullable OsStats os, @Nullable ProcessStats process, @Nullable JvmStats jvm, @Nullable ThreadPoolStats threadPool, @Nullable NetworkStats network,
@Nullable FsStats fs, @Nullable TransportStats transport, @Nullable HttpStats http) {
@Nullable OsStats os, @Nullable ProcessStats process, @Nullable JvmStats jvm, @Nullable ThreadPoolStats threadPool,
@Nullable NetworkStats network, @Nullable FsStats fs, @Nullable TransportStats transport, @Nullable HttpStats http,
@Nullable FieldDataBreakerStats breaker) {
super(node);
this.timestamp = timestamp;
this.hostname = hostname;
@ -94,6 +99,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent {
this.fs = fs;
this.transport = transport;
this.http = http;
this.breaker = breaker;
}
public long getTimestamp() {
@ -171,6 +177,11 @@ public class NodeStats extends NodeOperationResponse implements ToXContent {
return this.http;
}
@Nullable
public FieldDataBreakerStats getBreaker() {
return this.breaker;
}
public static NodeStats readNodeStats(StreamInput in) throws IOException {
NodeStats nodeInfo = new NodeStats();
nodeInfo.readFrom(in);
@ -211,6 +222,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent {
if (in.readBoolean()) {
http = HttpStats.readHttpStats(in);
}
breaker = FieldDataBreakerStats.readOptionalCircuitBreakerStats(in);
}
@Override
@ -277,6 +289,7 @@ public class NodeStats extends NodeOperationResponse implements ToXContent {
out.writeBoolean(true);
http.writeTo(out);
}
out.writeOptionalStreamable(breaker);
}
@Override
@ -326,6 +339,9 @@ public class NodeStats extends NodeOperationResponse implements ToXContent {
if (getHttp() != null) {
getHttp().toXContent(builder, params);
}
if (getBreaker() != null) {
getBreaker().toXContent(builder, params);
}
return builder;
}

View File

@ -40,6 +40,7 @@ public class NodesStatsRequest extends NodesOperationRequest<NodesStatsRequest>
private boolean fs;
private boolean transport;
private boolean http;
private boolean breaker;
protected NodesStatsRequest() {
}
@ -65,6 +66,7 @@ public class NodesStatsRequest extends NodesOperationRequest<NodesStatsRequest>
this.fs = true;
this.transport = true;
this.http = true;
this.breaker = true;
return this;
}
@ -81,6 +83,7 @@ public class NodesStatsRequest extends NodesOperationRequest<NodesStatsRequest>
this.fs = false;
this.transport = false;
this.http = false;
this.breaker = false;
return this;
}
@ -225,6 +228,18 @@ public class NodesStatsRequest extends NodesOperationRequest<NodesStatsRequest>
return this;
}
public boolean breaker() {
return this.breaker;
}
/**
* Should the node's circuit breaker stats be returned.
*/
public NodesStatsRequest breaker(boolean breaker) {
this.breaker = breaker;
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -237,6 +252,7 @@ public class NodesStatsRequest extends NodesOperationRequest<NodesStatsRequest>
fs = in.readBoolean();
transport = in.readBoolean();
http = in.readBoolean();
breaker = in.readBoolean();
}
@Override
@ -251,6 +267,7 @@ public class NodesStatsRequest extends NodesOperationRequest<NodesStatsRequest>
out.writeBoolean(fs);
out.writeBoolean(transport);
out.writeBoolean(http);
out.writeBoolean(breaker);
}
}

View File

@ -97,7 +97,8 @@ public class TransportNodesStatsAction extends TransportNodesOperationAction<Nod
@Override
protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) throws ElasticSearchException {
NodesStatsRequest request = nodeStatsRequest.request;
return nodeService.stats(request.indices(), request.os(), request.process(), request.jvm(), request.threadPool(), request.network(), request.fs(), request.transport(), request.http());
return nodeService.stats(request.indices(), request.os(), request.process(), request.jvm(), request.threadPool(), request.network(),
request.fs(), request.transport(), request.http(), request.breaker());
}
@Override

View File

@ -117,7 +117,7 @@ public class TransportClusterStatsAction extends TransportNodesOperationAction<C
@Override
protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) throws ElasticSearchException {
NodeInfo nodeInfo = nodeService.info(false, true, false, true, false, false, true, false, true);
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, false, true, true, false, false, true, false, false);
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, false, true, true, false, false, true, false, false, false);
List<ShardStats> shardsStats = new ArrayList<ShardStats>();
for (String index : indicesService.indices()) {
IndexService indexService = indicesService.indexService(index);

View File

@ -29,6 +29,7 @@ import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
/**
@ -75,6 +76,8 @@ public class ClusterDynamicSettingsModule extends AbstractModule {
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED);
clusterDynamicSettings.addDynamicSetting(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, Validator.TIME);
clusterDynamicSettings.addDynamicSetting(SnapshotInProgressAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED);
clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, Validator.BYTES_SIZE);
clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
}
public void addDynamicSettings(String... settings) {

View File

@ -0,0 +1,33 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.breaker;
import org.elasticsearch.ElasticSearchException;
/**
* Exception thrown when the circuit breaker trips
*/
public class CircuitBreakingException extends ElasticSearchException {
// TODO: maybe add more neat metrics here?
public CircuitBreakingException(String message) {
super(message);
}
}

View File

@ -0,0 +1,180 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.breaker;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.util.concurrent.atomic.AtomicLong;
/**
* MemoryCircuitBreaker is a circuit breaker that breaks once a
* configurable memory limit has been reached.
*/
public class MemoryCircuitBreaker {
private final long memoryBytesLimit;
private final double overheadConstant;
private final AtomicLong used;
private final ESLogger logger;
/**
* Create a circuit breaker that will break if the number of estimated
* bytes grows above the limit. All estimations will be multiplied by
* the given overheadConstant. This breaker starts with 0 bytes used.
* @param limit circuit breaker limit
* @param overheadConstant constant multiplier for byte estimations
*/
public MemoryCircuitBreaker(ByteSizeValue limit, double overheadConstant, ESLogger logger) {
this.memoryBytesLimit = limit.bytes();
this.overheadConstant = overheadConstant;
this.used = new AtomicLong(0);
this.logger = logger;
if (logger.isTraceEnabled()) {
logger.trace("Creating MemoryCircuitBreaker with a limit of {} bytes ({}) and a overhead constant of {}",
this.memoryBytesLimit, limit, this.overheadConstant);
}
}
/**
* Create a circuit breaker that will break if the number of estimated
* bytes grows above the limit. All estimations will be multiplied by
* the given overheadConstant. Uses the given oldBreaker to initialize
* the starting offset.
* @param limit circuit breaker limit
* @param overheadConstant constant multiplier for byte estimations
* @param oldBreaker the previous circuit breaker to inherit the used value from (starting offset)
*/
public MemoryCircuitBreaker(ByteSizeValue limit, double overheadConstant, MemoryCircuitBreaker oldBreaker, ESLogger logger) {
this.memoryBytesLimit = limit.bytes();
this.overheadConstant = overheadConstant;
if (oldBreaker == null) {
this.used = new AtomicLong(0);
} else {
this.used = oldBreaker.used;
}
this.logger = logger;
if (logger.isTraceEnabled()) {
logger.trace("Creating MemoryCircuitBreaker with a limit of {} bytes ({}) and a overhead constant of {}",
this.memoryBytesLimit, limit, this.overheadConstant);
}
}
/**
* Method used to trip the breaker
* @throws CircuitBreakingException
*/
public void circuitBreak() throws CircuitBreakingException {
throw new CircuitBreakingException("Data too large, data would be larger than limit of [" +
memoryBytesLimit + "] bytes");
}
/**
* Add a number of bytes, tripping the circuit breaker if the aggregated
* estimates are above the limit. Automatically trips the breaker if the
* memory limit is set to 0. Will never trip the breaker if the limit is
* set < 0, but can still be used to aggregate estimations.
* @param bytes number of bytes to add to the breaker
* @return number of "used" bytes so far
* @throws CircuitBreakingException
*/
public double addEstimateBytesAndMaybeBreak(long bytes) throws CircuitBreakingException {
// short-circuit on no data allowed, immediately throwing an exception
if (memoryBytesLimit == 0) {
circuitBreak();
}
long newUsed;
// If there is no limit (-1), we can optimize a bit by using
// .addAndGet() instead of looping (because we don't have to check a
// limit), which makes the RamAccountingTermsEnum case faster.
if (this.memoryBytesLimit == -1) {
newUsed = this.used.addAndGet(bytes);
if (logger.isTraceEnabled()) {
logger.trace("Adding [{}] to used bytes [new used: [{}], limit: [-1b]]",
new ByteSizeValue(bytes), new ByteSizeValue(newUsed));
}
return newUsed;
}
// Otherwise, check the addition and commit the addition, looping if
// there are conflicts. May result in additional logging, but it's
// trace logging and shouldn't be counted on for additions.
long currentUsed;
do {
currentUsed = this.used.get();
newUsed = currentUsed + bytes;
long newUsedWithOverhead = (long)(newUsed * overheadConstant);
if (logger.isTraceEnabled()) {
logger.trace("Adding [{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]",
new ByteSizeValue(bytes), new ByteSizeValue(newUsed),
memoryBytesLimit, new ByteSizeValue(memoryBytesLimit),
newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead));
}
if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) {
logger.error("New used memory {} [{}] would be larger than configured breaker: {} [{}], breaking",
newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead),
memoryBytesLimit, new ByteSizeValue(memoryBytesLimit));
circuitBreak();
}
// Attempt to set the new used value, but make sure it hasn't changed
// underneath us, if it has, keep trying until we are able to set it
} while (!this.used.compareAndSet(currentUsed, newUsed));
return newUsed;
}
/**
* Add an <b>exact</b> number of bytes, not checking for tripping the
* circuit breaker. This bypasses the overheadConstant multiplication.
* @param bytes number of bytes to add to the breaker
* @return number of "used" bytes so far
*/
public long addWithoutBreaking(long bytes) {
long u = used.addAndGet(bytes);
if (logger.isTraceEnabled()) {
logger.trace("Adjusted breaker by [{}] bytes, now [{}]", bytes, u);
}
assert u >= 0 : "Used bytes: [" + u + "] must be >= 0";
return u;
}
/**
* @return the number of aggregated "used" bytes so far
*/
public long getUsed() {
return this.used.get();
}
/**
* @return the maximum number of bytes before the circuit breaker will trip
*/
public long getMaximum() {
return this.memoryBytesLimit;
}
/**
* @return the constant multiplier the breaker uses for aggregations
*/
public double getOverhead() {
return this.overheadConstant;
}
}

View File

@ -2,6 +2,9 @@ package org.elasticsearch.index.fielddata;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.AbstractIndexComponent;
@ -9,6 +12,8 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.settings.IndexSettings;
import java.io.IOException;
/**
*/
public abstract class AbstractIndexFieldData<FD extends AtomicFieldData> extends AbstractIndexComponent implements IndexFieldData<FD> {
@ -53,4 +58,41 @@ public abstract class AbstractIndexFieldData<FD extends AtomicFieldData> extends
}
}
/**
* A {@code PerValueEstimator} is a sub-class that can be used to estimate
* the memory overhead for loading the data. Each field data
* implementation should implement its own {@code PerValueEstimator} if it
* intends to take advantage of the MemoryCircuitBreaker.
* <p/>
* Note that the .beforeLoad(...) and .afterLoad(...) methods must be
* manually called.
*/
public interface PerValueEstimator {
/**
* @return the number of bytes for the given term
*/
public long bytesPerValue(BytesRef term);
/**
* Execute any pre-loading estimations for the terms. May also
* optionally wrap a {@link TermsEnum} in a
* {@link RamAccountingTermsEnum}
* which will estimate the memory on a per-term basis.
*
* @param terms terms to be estimated
* @return A TermsEnum for the given terms
* @throws IOException
*/
public TermsEnum beforeLoad(Terms terms) throws IOException;
/**
* Possibly adjust a circuit breaker after field data has been loaded,
* now that the actual amount of memory used by the field data is known
*
* @param termsEnum terms that were loaded
* @param actualUsed actual field data memory usage
*/
public void afterLoad(TermsEnum termsEnum, long actualUsed);
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.index.IndexComponent;
import org.elasticsearch.index.fielddata.fieldcomparator.SortMode;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
/**
*/
@ -170,7 +171,8 @@ public interface IndexFieldData<FD extends AtomicFieldData> extends IndexCompone
interface Builder {
IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper<?> mapper, IndexFieldDataCache cache);
IndexFieldData build(Index index, @IndexSettings Settings indexSettings, FieldMapper<?> mapper, IndexFieldDataCache cache,
CircuitBreakerService breakerService);
}
public interface WithOrdinals<FD extends AtomicFieldData.WithOrdinals> extends IndexFieldData<FD> {

View File

@ -36,6 +36,7 @@ import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@ -54,6 +55,7 @@ public class IndexFieldDataService extends AbstractIndexComponent {
private final static ImmutableMap<String, IndexFieldData.Builder> buildersByType;
private final static ImmutableMap<String, IndexFieldData.Builder> docValuesBuildersByType;
private final static ImmutableMap<Tuple<String, String>, IndexFieldData.Builder> buildersByTypeAndFormat;
private final CircuitBreakerService circuitBreakerService;
static {
buildersByType = MapBuilder.<String, IndexFieldData.Builder>newMapBuilder()
@ -123,14 +125,16 @@ public class IndexFieldDataService extends AbstractIndexComponent {
IndexService indexService;
// public for testing
public IndexFieldDataService(Index index) {
this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCache(ImmutableSettings.Builder.EMPTY_SETTINGS));
public IndexFieldDataService(Index index, CircuitBreakerService circuitBreakerService) {
this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCache(ImmutableSettings.Builder.EMPTY_SETTINGS), circuitBreakerService);
}
@Inject
public IndexFieldDataService(Index index, @IndexSettings Settings indexSettings, IndicesFieldDataCache indicesFieldDataCache) {
public IndexFieldDataService(Index index, @IndexSettings Settings indexSettings, IndicesFieldDataCache indicesFieldDataCache,
CircuitBreakerService circuitBreakerService) {
super(index, indexSettings);
this.indicesFieldDataCache = indicesFieldDataCache;
this.circuitBreakerService = circuitBreakerService;
}
// we need to "inject" the index service to not create cyclic dep
@ -231,7 +235,7 @@ public class IndexFieldDataService extends AbstractIndexComponent {
fieldDataCaches.put(fieldNames.indexName(), cache);
}
fieldData = builder.build(index, indexSettings, mapper, cache);
fieldData = builder.build(index, indexSettings, mapper, cache, circuitBreakerService);
loadedFieldData.put(fieldNames.indexName(), fieldData);
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.fielddata;
import org.apache.lucene.index.FilteredTermsEnum;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.index.fielddata.AbstractIndexFieldData;
import java.io.IOException;
/**
* {@link TermsEnum} that takes a MemoryCircuitBreaker, increasing the breaker
* every time {@code .next(...)} is called. Proxies all methods to the original
* TermsEnum otherwise.
*/
public final class RamAccountingTermsEnum extends FilteredTermsEnum {
// Flush every 1mb
private static final long FLUSH_BUFFER_SIZE = 1024 * 1024;
private final MemoryCircuitBreaker breaker;
private final TermsEnum termsEnum;
private final AbstractIndexFieldData.PerValueEstimator estimator;
private long totalBytes;
private long flushBuffer;
public RamAccountingTermsEnum(TermsEnum termsEnum, MemoryCircuitBreaker breaker, AbstractIndexFieldData.PerValueEstimator estimator) {
super(termsEnum);
this.breaker = breaker;
this.termsEnum = termsEnum;
this.estimator = estimator;
this.totalBytes = 0;
this.flushBuffer = 0;
}
/**
* Always accept the term.
*/
@Override
protected AcceptStatus accept(BytesRef term) throws IOException {
return AcceptStatus.YES;
}
/**
* Flush the {@code flushBuffer} to the breaker, incrementing the total
* bytes and resetting the buffer.
*/
public void flush() {
breaker.addEstimateBytesAndMaybeBreak(this.flushBuffer);
this.totalBytes += this.flushBuffer;
this.flushBuffer = 0;
}
/**
* Proxy to the original next() call, but estimates the overhead of
* loading the next term.
*/
@Override
public BytesRef next() throws IOException {
BytesRef term = termsEnum.next();
if (term == null && this.flushBuffer != 0) {
// We have reached the end of the termsEnum, flush the buffer
flush();
} else {
this.flushBuffer += estimator.bytesPerValue(term);
if (this.flushBuffer >= FLUSH_BUFFER_SIZE) {
flush();
}
}
return term;
}
/**
* @return the total number of bytes that have been aggregated
*/
public long getTotalBytes() {
return this.totalBytes;
}
}

View File

@ -26,10 +26,13 @@ import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.fielddata.plain.PackedArrayAtomicFieldData;
import org.elasticsearch.index.fielddata.plain.PagedBytesAtomicFieldData;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@ -43,9 +46,12 @@ public class ShardFieldData extends AbstractIndexShardComponent implements Index
final ConcurrentMap<String, CounterMetric> perFieldTotals = ConcurrentCollections.newConcurrentMap();
private final CircuitBreakerService breakerService;
@Inject
public ShardFieldData(ShardId shardId, @IndexSettings Settings indexSettings) {
public ShardFieldData(ShardId shardId, @IndexSettings Settings indexSettings, CircuitBreakerService breakerService) {
super(shardId, indexSettings);
this.breakerService = breakerService;
}
public FieldDataStats stats(String... fields) {
@ -89,6 +95,10 @@ public class ShardFieldData extends AbstractIndexShardComponent implements Index
evictionsMetric.inc();
}
if (sizeInBytes != -1) {
// Since field data is being unloaded (due to expiration or manual
// clearing), we also need to decrement the used bytes in the breaker
breakerService.getBreaker().addWithoutBreaking(-sizeInBytes);
totalMetric.dec(sizeInBytes);
String keyFieldName = fieldNames.indexName();

View File

@ -28,6 +28,7 @@ import org.elasticsearch.index.fielddata.fieldcomparator.SortMode;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.FieldMapper.Names;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
/**
* A field data implementation that forbids loading and will throw an {@link ElasticSearchIllegalStateException} if you try to load
@ -37,7 +38,9 @@ public final class DisabledIndexFieldData extends AbstractIndexFieldData<AtomicF
public static class Builder implements IndexFieldData.Builder {
@Override
public IndexFieldData<AtomicFieldData<?>> build(Index index, @IndexSettings Settings indexSettings, FieldMapper<?> mapper, IndexFieldDataCache cache) {
public IndexFieldData<AtomicFieldData<?>> build(Index index, @IndexSettings Settings indexSettings, FieldMapper<?> mapper,
IndexFieldDataCache cache, CircuitBreakerService breakerService) {
// Ignore Circuit Breaker
return new DisabledIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache);
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.index.mapper.FieldMapper.Names;
import org.elasticsearch.index.mapper.internal.IdFieldMapper;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import java.util.Map;
import java.util.Set;
@ -77,7 +78,9 @@ public abstract class DocValuesIndexFieldData {
}
@Override
public IndexFieldData<?> build(Index index, Settings indexSettings, FieldMapper<?> mapper, IndexFieldDataCache cache) {
public IndexFieldData<?> build(Index index, Settings indexSettings, FieldMapper<?> mapper, IndexFieldDataCache cache,
CircuitBreakerService breakerService) {
// Ignore Circuit Breaker
final FieldMapper.Names fieldNames = mapper.names();
final Settings fdSettings = mapper.fieldDataType().getSettings();
final Map<String, Settings> filter = fdSettings.getGroups("filter");

View File

@ -35,21 +35,27 @@ import org.elasticsearch.index.fielddata.ordinals.Ordinals.Docs;
import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
/**
*/
public class DoubleArrayIndexFieldData extends AbstractIndexFieldData<DoubleArrayAtomicFieldData> implements IndexNumericFieldData<DoubleArrayAtomicFieldData> {
private final CircuitBreakerService breakerService;
public static class Builder implements IndexFieldData.Builder {
@Override
public IndexFieldData<?> build(Index index, @IndexSettings Settings indexSettings, FieldMapper<?> mapper, IndexFieldDataCache cache) {
return new DoubleArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache);
public IndexFieldData<?> build(Index index, @IndexSettings Settings indexSettings, FieldMapper<?> mapper, IndexFieldDataCache cache,
CircuitBreakerService breakerService) {
return new DoubleArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, breakerService);
}
}
public DoubleArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache) {
public DoubleArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames,
FieldDataType fieldDataType, IndexFieldDataCache cache, CircuitBreakerService breakerService) {
super(index, indexSettings, fieldNames, fieldDataType, cache);
this.breakerService = breakerService;
}
@Override
@ -69,8 +75,13 @@ public class DoubleArrayIndexFieldData extends AbstractIndexFieldData<DoubleArra
AtomicReader reader = context.reader();
Terms terms = reader.terms(getFieldNames().indexName());
DoubleArrayAtomicFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker());
if (terms == null) {
return DoubleArrayAtomicFieldData.empty(reader.maxDoc());
data = DoubleArrayAtomicFieldData.empty(reader.maxDoc());
estimator.afterLoad(null, data.getMemorySizeInBytes());
return data;
}
// TODO: how can we guess the number of terms? numerics end up creating more terms per value...
final BigDoubleArrayList values = new BigDoubleArrayList();
@ -78,6 +89,7 @@ public class DoubleArrayIndexFieldData extends AbstractIndexFieldData<DoubleArra
values.add(0); // first "t" indicates null value
final float acceptableTransientOverheadRatio = fieldDataType.getSettings().getAsFloat("acceptable_transient_overhead_ratio", OrdinalsBuilder.DEFAULT_ACCEPTABLE_OVERHEAD_RATIO);
OrdinalsBuilder builder = new OrdinalsBuilder(reader.maxDoc(), acceptableTransientOverheadRatio);
boolean success = false;
try {
final BytesRefIterator iter = builder.buildFromTerms(getNumericType().wrapTermsEnum(terms.iterator(null)));
BytesRef term;
@ -89,12 +101,14 @@ public class DoubleArrayIndexFieldData extends AbstractIndexFieldData<DoubleArra
Docs ordinals = build.ordinals();
final FixedBitSet set = builder.buildDocsWithValuesSet();
// there's sweatspot where due to low unique value count, using ordinals will consume less memory
// there's sweet spot where due to low unique value count, using ordinals will consume less memory
long singleValuesArraySize = reader.maxDoc() * RamUsageEstimator.NUM_BYTES_DOUBLE + (set == null ? 0 : RamUsageEstimator.sizeOf(set.getBits()) + RamUsageEstimator.NUM_BYTES_INT);
long uniqueValuesArraySize = values.sizeInBytes();
long ordinalsSize = build.getMemorySizeInBytes();
if (uniqueValuesArraySize + ordinalsSize < singleValuesArraySize) {
return new DoubleArrayAtomicFieldData.WithOrdinals(values, reader.maxDoc(), build);
data = new DoubleArrayAtomicFieldData.WithOrdinals(values, reader.maxDoc(), build);
success = true;
return data;
}
int maxDoc = reader.maxDoc();
@ -104,17 +118,19 @@ public class DoubleArrayIndexFieldData extends AbstractIndexFieldData<DoubleArra
}
assert sValues.size() == maxDoc;
if (set == null) {
return new DoubleArrayAtomicFieldData.Single(sValues, maxDoc, ordinals.getNumOrds());
data = new DoubleArrayAtomicFieldData.Single(sValues, maxDoc, ordinals.getNumOrds());
} else {
return new DoubleArrayAtomicFieldData.SingleFixedSet(sValues, maxDoc, set, ordinals.getNumOrds());
data = new DoubleArrayAtomicFieldData.SingleFixedSet(sValues, maxDoc, set, ordinals.getNumOrds());
}
} else {
return new DoubleArrayAtomicFieldData.WithOrdinals(
values,
reader.maxDoc(),
build);
data = new DoubleArrayAtomicFieldData.WithOrdinals(values, reader.maxDoc(), build);
}
success = true;
return data;
} finally {
if (success) {
estimator.afterLoad(null, data.getMemorySizeInBytes());
}
builder.close();
}

View File

@ -35,21 +35,27 @@ import org.elasticsearch.index.fielddata.ordinals.Ordinals;
import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
/**
*/
public class FSTBytesIndexFieldData extends AbstractBytesIndexFieldData<FSTBytesAtomicFieldData> {
private final CircuitBreakerService breakerService;
public static class Builder implements IndexFieldData.Builder {
@Override
public IndexFieldData<FSTBytesAtomicFieldData> build(Index index, @IndexSettings Settings indexSettings, FieldMapper<?> mapper, IndexFieldDataCache cache) {
return new FSTBytesIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache);
public IndexFieldData<FSTBytesAtomicFieldData> build(Index index, @IndexSettings Settings indexSettings, FieldMapper<?> mapper,
IndexFieldDataCache cache, CircuitBreakerService breakerService) {
return new FSTBytesIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, breakerService);
}
}
FSTBytesIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache) {
FSTBytesIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType,
IndexFieldDataCache cache, CircuitBreakerService breakerService) {
super(index, indexSettings, fieldNames, fieldDataType, cache);
this.breakerService = breakerService;
}
@Override
@ -57,8 +63,13 @@ public class FSTBytesIndexFieldData extends AbstractBytesIndexFieldData<FSTBytes
AtomicReader reader = context.reader();
Terms terms = reader.terms(getFieldNames().indexName());
FSTBytesAtomicFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker());
if (terms == null) {
return FSTBytesAtomicFieldData.empty(reader.maxDoc());
data = FSTBytesAtomicFieldData.empty(reader.maxDoc());
estimator.afterLoad(null, data.getMemorySizeInBytes());
return data;
}
PositiveIntOutputs outputs = PositiveIntOutputs.getSingleton();
org.apache.lucene.util.fst.Builder<Long> fstBuilder = new org.apache.lucene.util.fst.Builder<Long>(INPUT_TYPE.BYTE1, outputs);
@ -72,6 +83,7 @@ public class FSTBytesIndexFieldData extends AbstractBytesIndexFieldData<FSTBytes
}
final float acceptableTransientOverheadRatio = fieldDataType.getSettings().getAsFloat("acceptable_transient_overhead_ratio", OrdinalsBuilder.DEFAULT_ACCEPTABLE_OVERHEAD_RATIO);
OrdinalsBuilder builder = new OrdinalsBuilder(numTerms, reader.maxDoc(), acceptableTransientOverheadRatio);
boolean success = false;
try {
// we don't store an ord 0 in the FST since we could have an empty string in there and FST don't support
@ -92,8 +104,13 @@ public class FSTBytesIndexFieldData extends AbstractBytesIndexFieldData<FSTBytes
final Ordinals ordinals = builder.build(fieldDataType.getSettings());
return new FSTBytesAtomicFieldData(fst, ordinals);
data = new FSTBytesAtomicFieldData(fst, ordinals);
success = true;
return data;
} finally {
if (success) {
estimator.afterLoad(null, data.getMemorySizeInBytes());
}
builder.close();
}
}

View File

@ -35,21 +35,27 @@ import org.elasticsearch.index.fielddata.ordinals.Ordinals.Docs;
import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
/**
*/
public class FloatArrayIndexFieldData extends AbstractIndexFieldData<FloatArrayAtomicFieldData> implements IndexNumericFieldData<FloatArrayAtomicFieldData> {
private final CircuitBreakerService breakerService;
public static class Builder implements IndexFieldData.Builder {
@Override
public IndexFieldData<?> build(Index index, @IndexSettings Settings indexSettings, FieldMapper<?> mapper, IndexFieldDataCache cache) {
return new FloatArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache);
public IndexFieldData<?> build(Index index, @IndexSettings Settings indexSettings, FieldMapper<?> mapper, IndexFieldDataCache cache,
CircuitBreakerService breakerService) {
return new FloatArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, breakerService);
}
}
public FloatArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache) {
public FloatArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames,
FieldDataType fieldDataType, IndexFieldDataCache cache, CircuitBreakerService breakerService) {
super(index, indexSettings, fieldNames, fieldDataType, cache);
this.breakerService = breakerService;
}
@Override
@ -68,8 +74,13 @@ public class FloatArrayIndexFieldData extends AbstractIndexFieldData<FloatArrayA
public FloatArrayAtomicFieldData loadDirect(AtomicReaderContext context) throws Exception {
AtomicReader reader = context.reader();
Terms terms = reader.terms(getFieldNames().indexName());
FloatArrayAtomicFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker());
if (terms == null) {
return FloatArrayAtomicFieldData.empty(reader.maxDoc());
data = FloatArrayAtomicFieldData.empty(reader.maxDoc());
estimator.afterLoad(null, data.getMemorySizeInBytes());
return data;
}
// TODO: how can we guess the number of terms? numerics end up creating more terms per value...
final BigFloatArrayList values = new BigFloatArrayList();
@ -78,6 +89,7 @@ public class FloatArrayIndexFieldData extends AbstractIndexFieldData<FloatArrayA
final float acceptableTransientOverheadRatio = fieldDataType.getSettings().getAsFloat("acceptable_transient_overhead_ratio", OrdinalsBuilder.DEFAULT_ACCEPTABLE_OVERHEAD_RATIO);
OrdinalsBuilder builder = new OrdinalsBuilder(reader.maxDoc(), acceptableTransientOverheadRatio);
boolean success = false;
try {
BytesRefIterator iter = builder.buildFromTerms(getNumericType().wrapTermsEnum(terms.iterator(null)));
BytesRef term;
@ -89,12 +101,14 @@ public class FloatArrayIndexFieldData extends AbstractIndexFieldData<FloatArrayA
Docs ordinals = build.ordinals();
final FixedBitSet set = builder.buildDocsWithValuesSet();
// there's sweatspot where due to low unique value count, using ordinals will consume less memory
// there's sweet spot where due to low unique value count, using ordinals will consume less memory
long singleValuesArraySize = reader.maxDoc() * RamUsageEstimator.NUM_BYTES_FLOAT + (set == null ? 0 : RamUsageEstimator.sizeOf(set.getBits()) + RamUsageEstimator.NUM_BYTES_INT);
long uniqueValuesArraySize = values.sizeInBytes();
long ordinalsSize = build.getMemorySizeInBytes();
if (uniqueValuesArraySize + ordinalsSize < singleValuesArraySize) {
return new FloatArrayAtomicFieldData.WithOrdinals(values, reader.maxDoc(), build);
data = new FloatArrayAtomicFieldData.WithOrdinals(values, reader.maxDoc(), build);
success = true;
return data;
}
int maxDoc = reader.maxDoc();
@ -104,17 +118,19 @@ public class FloatArrayIndexFieldData extends AbstractIndexFieldData<FloatArrayA
}
assert sValues.size() == maxDoc;
if (set == null) {
return new FloatArrayAtomicFieldData.Single(sValues, maxDoc, ordinals.getNumOrds());
data = new FloatArrayAtomicFieldData.Single(sValues, maxDoc, ordinals.getNumOrds());
} else {
return new FloatArrayAtomicFieldData.SingleFixedSet(sValues, maxDoc, set, ordinals.getNumOrds());
data = new FloatArrayAtomicFieldData.SingleFixedSet(sValues, maxDoc, set, ordinals.getNumOrds());
}
} else {
return new FloatArrayAtomicFieldData.WithOrdinals(
values,
reader.maxDoc(),
build);
data = new FloatArrayAtomicFieldData.WithOrdinals(values, reader.maxDoc(), build);
}
success = true;
return data;
} finally {
if (success) {
estimator.afterLoad(null, data.getMemorySizeInBytes());
}
builder.close();
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.index.fielddata.*;
import org.elasticsearch.index.fielddata.fieldcomparator.SortMode;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.FieldMapper.Names;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import java.io.IOException;
@ -65,7 +66,9 @@ public class GeoPointBinaryDVIndexFieldData extends DocValuesIndexFieldData impl
public static class Builder implements IndexFieldData.Builder {
@Override
public IndexFieldData<?> build(Index index, Settings indexSettings, FieldMapper<?> mapper, IndexFieldDataCache cache) {
public IndexFieldData<?> build(Index index, Settings indexSettings, FieldMapper<?> mapper, IndexFieldDataCache cache,
CircuitBreakerService breakerService) {
// Ignore breaker
final FieldMapper.Names fieldNames = mapper.names();
return new GeoPointBinaryDVIndexFieldData(index, fieldNames);
}

View File

@ -38,6 +38,7 @@ import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.geo.GeoPointFieldMapper;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
/**
*/
@ -45,11 +46,13 @@ public class GeoPointCompressedIndexFieldData extends AbstractGeoPointIndexField
private static final String PRECISION_KEY = "precision";
private static final Distance DEFAULT_PRECISION_VALUE = new Distance(1, DistanceUnit.CENTIMETERS);
private final CircuitBreakerService breakerService;
public static class Builder implements IndexFieldData.Builder {
@Override
public IndexFieldData<?> build(Index index, @IndexSettings Settings indexSettings, FieldMapper<?> mapper, IndexFieldDataCache cache) {
public IndexFieldData<?> build(Index index, @IndexSettings Settings indexSettings, FieldMapper<?> mapper, IndexFieldDataCache cache,
CircuitBreakerService breakerService) {
FieldDataType type = mapper.fieldDataType();
final String precisionAsString = type.getSettings().get(PRECISION_KEY);
final Distance precision;
@ -58,15 +61,18 @@ public class GeoPointCompressedIndexFieldData extends AbstractGeoPointIndexField
} else {
precision = DEFAULT_PRECISION_VALUE;
}
return new GeoPointCompressedIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, precision);
return new GeoPointCompressedIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, precision, breakerService);
}
}
private final GeoPointFieldMapper.Encoding encoding;
public GeoPointCompressedIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache, Distance precision) {
public GeoPointCompressedIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames,
FieldDataType fieldDataType, IndexFieldDataCache cache, Distance precision,
CircuitBreakerService breakerService) {
super(index, indexSettings, fieldNames, fieldDataType, cache);
this.encoding = GeoPointFieldMapper.Encoding.of(precision);
this.breakerService = breakerService;
}
@Override
@ -74,8 +80,13 @@ public class GeoPointCompressedIndexFieldData extends AbstractGeoPointIndexField
AtomicReader reader = context.reader();
Terms terms = reader.terms(getFieldNames().indexName());
AtomicGeoPointFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker());
if (terms == null) {
return new Empty(reader.maxDoc());
data = new Empty(reader.maxDoc());
estimator.afterLoad(null, data.getMemorySizeInBytes());
return data;
}
final long initialSize;
if (terms.size() >= 0) {
@ -88,6 +99,7 @@ public class GeoPointCompressedIndexFieldData extends AbstractGeoPointIndexField
PagedMutable lon = new PagedMutable(initialSize, pageSize, encoding.numBitsPerCoordinate(), PackedInts.COMPACT);
final float acceptableTransientOverheadRatio = fieldDataType.getSettings().getAsFloat("acceptable_transient_overhead_ratio", OrdinalsBuilder.DEFAULT_ACCEPTABLE_OVERHEAD_RATIO);
OrdinalsBuilder builder = new OrdinalsBuilder(terms.size(), reader.maxDoc(), acceptableTransientOverheadRatio);
boolean success = false;
try {
final GeoPointEnum iter = new GeoPointEnum(builder.buildFromTerms(terms.iterator(null)));
GeoPoint point;
@ -116,18 +128,23 @@ public class GeoPointCompressedIndexFieldData extends AbstractGeoPointIndexField
}
FixedBitSet set = builder.buildDocsWithValuesSet();
if (set == null) {
return new GeoPointCompressedAtomicFieldData.Single(encoding, sLon, sLat, reader.maxDoc(), ordinals.getNumOrds());
data = new GeoPointCompressedAtomicFieldData.Single(encoding, sLon, sLat, reader.maxDoc(), ordinals.getNumOrds());
} else {
return new GeoPointCompressedAtomicFieldData.SingleFixedSet(encoding, sLon, sLat, reader.maxDoc(), set, ordinals.getNumOrds());
data = new GeoPointCompressedAtomicFieldData.SingleFixedSet(encoding, sLon, sLat, reader.maxDoc(), set, ordinals.getNumOrds());
}
} else {
if (lat.size() != build.getMaxOrd()) {
lat = lat.resize(build.getMaxOrd());
lon = lon.resize(build.getMaxOrd());
}
return new GeoPointCompressedAtomicFieldData.WithOrdinals(encoding, lon, lat, reader.maxDoc(), build);
data = new GeoPointCompressedAtomicFieldData.WithOrdinals(encoding, lon, lat, reader.maxDoc(), build);
}
success = true;
return data;
} finally {
if (success) {
estimator.afterLoad(null, data.getMemorySizeInBytes());
}
builder.close();
}

View File

@ -33,21 +33,27 @@ import org.elasticsearch.index.fielddata.ordinals.Ordinals.Docs;
import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
/**
*/
public class GeoPointDoubleArrayIndexFieldData extends AbstractGeoPointIndexFieldData {
private final CircuitBreakerService breakerService;
public static class Builder implements IndexFieldData.Builder {
@Override
public IndexFieldData<?> build(Index index, @IndexSettings Settings indexSettings, FieldMapper<?> mapper, IndexFieldDataCache cache) {
return new GeoPointDoubleArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache);
public IndexFieldData<?> build(Index index, @IndexSettings Settings indexSettings, FieldMapper<?> mapper, IndexFieldDataCache cache,
CircuitBreakerService breakerService) {
return new GeoPointDoubleArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, breakerService);
}
}
public GeoPointDoubleArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache) {
public GeoPointDoubleArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames,
FieldDataType fieldDataType, IndexFieldDataCache cache, CircuitBreakerService breakerService) {
super(index, indexSettings, fieldNames, fieldDataType, cache);
this.breakerService = breakerService;
}
@Override
@ -55,8 +61,13 @@ public class GeoPointDoubleArrayIndexFieldData extends AbstractGeoPointIndexFiel
AtomicReader reader = context.reader();
Terms terms = reader.terms(getFieldNames().indexName());
AtomicGeoPointFieldData data = null;
// TODO: Use an actual estimator to estimate before loading.
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker());
if (terms == null) {
return new Empty(reader.maxDoc());
data = new Empty(reader.maxDoc());
estimator.afterLoad(null, data.getMemorySizeInBytes());
return data;
}
final BigDoubleArrayList lat = new BigDoubleArrayList();
final BigDoubleArrayList lon = new BigDoubleArrayList();
@ -64,6 +75,7 @@ public class GeoPointDoubleArrayIndexFieldData extends AbstractGeoPointIndexFiel
lon.add(0); // first "t" indicates null value
final float acceptableTransientOverheadRatio = fieldDataType.getSettings().getAsFloat("acceptable_transient_overhead_ratio", OrdinalsBuilder.DEFAULT_ACCEPTABLE_OVERHEAD_RATIO);
OrdinalsBuilder builder = new OrdinalsBuilder(terms.size(), reader.maxDoc(), acceptableTransientOverheadRatio);
boolean success = false;
try {
final GeoPointEnum iter = new GeoPointEnum(builder.buildFromTerms(terms.iterator(null)));
GeoPoint point;
@ -85,16 +97,19 @@ public class GeoPointDoubleArrayIndexFieldData extends AbstractGeoPointIndexFiel
}
FixedBitSet set = builder.buildDocsWithValuesSet();
if (set == null) {
return new GeoPointDoubleArrayAtomicFieldData.Single(sLon, sLat, reader.maxDoc(), ordinals.getNumOrds());
data = new GeoPointDoubleArrayAtomicFieldData.Single(sLon, sLat, reader.maxDoc(), ordinals.getNumOrds());
} else {
return new GeoPointDoubleArrayAtomicFieldData.SingleFixedSet(sLon, sLat, reader.maxDoc(), set, ordinals.getNumOrds());
data = new GeoPointDoubleArrayAtomicFieldData.SingleFixedSet(sLon, sLat, reader.maxDoc(), set, ordinals.getNumOrds());
}
} else {
return new GeoPointDoubleArrayAtomicFieldData.WithOrdinals(
lon, lat,
reader.maxDoc(), build);
data = new GeoPointDoubleArrayAtomicFieldData.WithOrdinals(lon, lat, reader.maxDoc(), build);
}
success = true;
return data;
} finally {
if (success) {
estimator.afterLoad(null, data.getMemorySizeInBytes());
}
builder.close();
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.fielddata.plain;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.index.fielddata.AbstractIndexFieldData;
import java.io.IOException;
/**
* Estimator that does nothing except for adjust the breaker after the field
* data has been loaded. Useful for field data implementations that do not yet
* have pre-loading estimations.
*/
public class NonEstimatingEstimator implements AbstractIndexFieldData.PerValueEstimator {
private final MemoryCircuitBreaker breaker;
NonEstimatingEstimator(MemoryCircuitBreaker breaker) {
this.breaker = breaker;
}
@Override
public long bytesPerValue(BytesRef term) {
return 0;
}
@Override
public TermsEnum beforeLoad(Terms terms) throws IOException {
return null;
}
@Override
public void afterLoad(@Nullable TermsEnum termsEnum, long actualUsed) {
breaker.addWithoutBreaking(actualUsed);
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.apache.lucene.util.FixedBitSet;
@ -30,8 +31,10 @@ import org.apache.lucene.util.NumericUtils;
import org.apache.lucene.util.packed.MonotonicAppendingLongBuffer;
import org.apache.lucene.util.packed.PackedInts;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.RamAccountingTermsEnum;
import org.elasticsearch.index.fielddata.*;
import org.elasticsearch.index.fielddata.fieldcomparator.LongValuesComparatorSource;
import org.elasticsearch.index.fielddata.fieldcomparator.SortMode;
@ -40,7 +43,9 @@ import org.elasticsearch.index.fielddata.ordinals.Ordinals.Docs;
import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import java.io.IOException;
import java.util.EnumSet;
/**
@ -58,18 +63,23 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData<AtomicNume
}
@Override
public IndexFieldData<AtomicNumericFieldData> build(Index index, @IndexSettings Settings indexSettings, FieldMapper<?> mapper, IndexFieldDataCache cache) {
return new PackedArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, numericType);
public IndexFieldData<AtomicNumericFieldData> build(Index index, @IndexSettings Settings indexSettings, FieldMapper<?> mapper,
IndexFieldDataCache cache, CircuitBreakerService breakerService) {
return new PackedArrayIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, numericType, breakerService);
}
}
private final NumericType numericType;
private final CircuitBreakerService breakerService;
public PackedArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache, NumericType numericType) {
public PackedArrayIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames,
FieldDataType fieldDataType, IndexFieldDataCache cache, NumericType numericType,
CircuitBreakerService breakerService) {
super(index, indexSettings, fieldNames, fieldDataType, cache);
Preconditions.checkNotNull(numericType);
Preconditions.checkArgument(EnumSet.of(NumericType.BYTE, NumericType.SHORT, NumericType.INT, NumericType.LONG).contains(numericType), getClass().getSimpleName() + " only supports integer types, not " + numericType);
this.numericType = numericType;
this.breakerService = breakerService;
}
@Override
@ -88,8 +98,12 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData<AtomicNume
public AtomicNumericFieldData loadDirect(AtomicReaderContext context) throws Exception {
AtomicReader reader = context.reader();
Terms terms = reader.terms(getFieldNames().indexName());
PackedArrayAtomicFieldData data = null;
PackedArrayEstimator estimator = new PackedArrayEstimator(breakerService.getBreaker(), getNumericType());
if (terms == null) {
return PackedArrayAtomicFieldData.empty(reader.maxDoc());
data = PackedArrayAtomicFieldData.empty(reader.maxDoc());
estimator.adjustForNoTerms(data.getMemorySizeInBytes());
return data;
}
// TODO: how can we guess the number of terms? numerics end up creating more terms per value...
// Lucene encodes numeric data so that the lexicographical (encoded) order matches the integer order so we know the sequence of
@ -98,8 +112,10 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData<AtomicNume
final float acceptableTransientOverheadRatio = fieldDataType.getSettings().getAsFloat("acceptable_transient_overhead_ratio", OrdinalsBuilder.DEFAULT_ACCEPTABLE_OVERHEAD_RATIO);
OrdinalsBuilder builder = new OrdinalsBuilder(-1, reader.maxDoc(), acceptableTransientOverheadRatio);
TermsEnum termsEnum = estimator.beforeLoad(terms);
boolean success = false;
try {
BytesRefIterator iter = builder.buildFromTerms(getNumericType().wrapTermsEnum(terms.iterator(null)));
BytesRefIterator iter = builder.buildFromTerms(termsEnum);
BytesRef term;
assert !getNumericType().isFloatingPoint();
final boolean indexedAsLong = getNumericType().requiredBits() > 32;
@ -156,28 +172,38 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData<AtomicNume
final long ordinalsSize = build.getMemorySizeInBytes();
if (uniqueValuesSize + ordinalsSize < singleValuesSize) {
return new PackedArrayAtomicFieldData.WithOrdinals(values, reader.maxDoc(), build);
}
final PackedInts.Mutable sValues = PackedInts.getMutable(reader.maxDoc(), bitsRequired, acceptableOverheadRatio);
if (missingValue != 0) {
sValues.fill(0, sValues.size(), missingValue);
}
for (int i = 0; i < reader.maxDoc(); i++) {
final long ord = ordinals.getOrd(i);
if (ord != Ordinals.MISSING_ORDINAL) {
sValues.set(i, values.get(ord - 1) - minValue);
data = new PackedArrayAtomicFieldData.WithOrdinals(values, reader.maxDoc(), build);
} else {
final PackedInts.Mutable sValues = PackedInts.getMutable(reader.maxDoc(), bitsRequired, acceptableOverheadRatio);
if (missingValue != 0) {
sValues.fill(0, sValues.size(), missingValue);
}
for (int i = 0; i < reader.maxDoc(); i++) {
final long ord = ordinals.getOrd(i);
if (ord != Ordinals.MISSING_ORDINAL) {
sValues.set(i, values.get(ord - 1) - minValue);
}
}
if (set == null) {
data = new PackedArrayAtomicFieldData.Single(sValues, minValue, reader.maxDoc(), ordinals.getNumOrds());
} else {
data = new PackedArrayAtomicFieldData.SingleSparse(sValues, minValue, reader.maxDoc(), missingValue, ordinals.getNumOrds());
}
}
if (set == null) {
return new PackedArrayAtomicFieldData.Single(sValues, minValue, reader.maxDoc(), ordinals.getNumOrds());
} else {
return new PackedArrayAtomicFieldData.SingleSparse(sValues, minValue, reader.maxDoc(), missingValue, ordinals.getNumOrds());
}
} else {
return new PackedArrayAtomicFieldData.WithOrdinals(values, reader.maxDoc(), build);
data = new PackedArrayAtomicFieldData.WithOrdinals(values, reader.maxDoc(), build);
}
success = true;
return data;
} finally {
if (!success) {
// If something went wrong, unwind any current estimations we've made
estimator.afterLoad(termsEnum, 0);
} else {
// Adjust as usual, based on the actual size of the field data
estimator.afterLoad(termsEnum, data.getMemorySizeInBytes());
}
builder.close();
}
@ -187,4 +213,62 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData<AtomicNume
public XFieldComparatorSource comparatorSource(@Nullable Object missingValue, SortMode sortMode) {
return new LongValuesComparatorSource(this, missingValue, sortMode);
}
/**
* Estimator that wraps numeric field data loading in a
* RamAccountingTermsEnum, adjusting the breaker after data has been
* loaded
*/
public class PackedArrayEstimator implements PerValueEstimator {
private final MemoryCircuitBreaker breaker;
private final NumericType type;
public PackedArrayEstimator(MemoryCircuitBreaker breaker, NumericType type) {
this.breaker = breaker;
this.type = type;
}
/**
* @return number of bytes per term, based on the NumericValue.requiredBits()
*/
@Override
public long bytesPerValue(BytesRef term) {
// Estimate about about 0.8 (8 / 10) compression ratio for
// numbers, but at least 4 bytes
return Math.max(type.requiredBits() / 10, 4);
}
/**
* @return A TermsEnum wrapped in a RamAccountingTermsEnum
* @throws IOException
*/
@Override
public TermsEnum beforeLoad(Terms terms) throws IOException {
return new RamAccountingTermsEnum(type.wrapTermsEnum(terms.iterator(null)), breaker, this);
}
/**
* Adjusts the breaker based on the aggregated value from the RamAccountingTermsEnum
*
* @param termsEnum terms that were wrapped and loaded
* @param actualUsed actual field data memory usage
*/
@Override
public void afterLoad(TermsEnum termsEnum, long actualUsed) {
assert termsEnum instanceof RamAccountingTermsEnum;
long estimatedBytes = ((RamAccountingTermsEnum) termsEnum).getTotalBytes();
breaker.addWithoutBreaking(-(estimatedBytes - actualUsed));
}
/**
* Adjust the breaker when no terms were actually loaded, but the field
* data takes up space regardless. For instance, when ordinals are
* used.
* @param actualUsed bytes actually used
*/
public void adjustForNoTerms(long actualUsed) {
breaker.addWithoutBreaking(actualUsed);
}
}
}

View File

@ -19,43 +19,55 @@
package org.elasticsearch.index.fielddata.plain;
import org.apache.lucene.codecs.BlockTreeTermsReader;
import org.apache.lucene.index.*;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PagedBytes;
import org.apache.lucene.util.packed.MonotonicAppendingLongBuffer;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.RamAccountingTermsEnum;
import org.elasticsearch.index.fielddata.*;
import org.elasticsearch.index.fielddata.ordinals.Ordinals;
import org.elasticsearch.index.fielddata.ordinals.OrdinalsBuilder;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import java.io.IOException;
/**
*/
public class PagedBytesIndexFieldData extends AbstractBytesIndexFieldData<PagedBytesAtomicFieldData> {
private final CircuitBreakerService breakerService;
public static class Builder implements IndexFieldData.Builder {
@Override
public IndexFieldData<PagedBytesAtomicFieldData> build(Index index, @IndexSettings Settings indexSettings, FieldMapper<?> mapper, IndexFieldDataCache cache) {
return new PagedBytesIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache);
public IndexFieldData<PagedBytesAtomicFieldData> build(Index index, @IndexSettings Settings indexSettings, FieldMapper<?> mapper,
IndexFieldDataCache cache, CircuitBreakerService breakerService) {
return new PagedBytesIndexFieldData(index, indexSettings, mapper.names(), mapper.fieldDataType(), cache, breakerService);
}
}
public PagedBytesIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache) {
public PagedBytesIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames,
FieldDataType fieldDataType, IndexFieldDataCache cache, CircuitBreakerService breakerService) {
super(index, indexSettings, fieldNames, fieldDataType, cache);
this.breakerService = breakerService;
}
@Override
public PagedBytesAtomicFieldData loadDirect(AtomicReaderContext context) throws Exception {
AtomicReader reader = context.reader();
PagedBytesEstimator estimator = new PagedBytesEstimator(context, breakerService.getBreaker());
Terms terms = reader.terms(getFieldNames().indexName());
if (terms == null) {
return PagedBytesAtomicFieldData.empty(reader.maxDoc());
PagedBytesAtomicFieldData emptyData = PagedBytesAtomicFieldData.empty(reader.maxDoc());
estimator.adjustForNoTerms(emptyData.getMemorySizeInBytes());
return emptyData;
}
final PagedBytes bytes = new PagedBytes(15);
@ -68,12 +80,22 @@ public class PagedBytesIndexFieldData extends AbstractBytesIndexFieldData<PagedB
} else {
numTerms = -1;
}
final float acceptableTransientOverheadRatio = fieldDataType.getSettings().getAsFloat("acceptable_transient_overhead_ratio", OrdinalsBuilder.DEFAULT_ACCEPTABLE_OVERHEAD_RATIO);
final float acceptableTransientOverheadRatio = fieldDataType.getSettings().getAsFloat(
FilterSettingFields.ACCEPTABLE_TRANSIENT_OVERHEAD_RATIO, OrdinalsBuilder.DEFAULT_ACCEPTABLE_OVERHEAD_RATIO);
OrdinalsBuilder builder = new OrdinalsBuilder(numTerms, reader.maxDoc(), acceptableTransientOverheadRatio);
// Wrap the context in an estimator and use it to either estimate
// the entire set, or wrap the TermsEnum so it can be calculated
// per-term
PagedBytesAtomicFieldData data = null;
TermsEnum termsEnum = estimator.beforeLoad(terms);
boolean success = false;
try {
// 0 is reserved for "unset"
bytes.copyUsingLengthPrefix(new BytesRef());
TermsEnum termsEnum = filter(terms, reader);
DocsEnum docsEnum = null;
for (BytesRef term = termsEnum.next(); term != null; term = termsEnum.next()) {
final long termOrd = builder.nextOrdinal();
@ -88,9 +110,149 @@ public class PagedBytesIndexFieldData extends AbstractBytesIndexFieldData<PagedB
PagedBytes.Reader bytesReader = bytes.freeze(true);
final Ordinals ordinals = builder.build(fieldDataType.getSettings());
return new PagedBytesAtomicFieldData(bytesReader, sizePointer, termOrdToBytesOffset, ordinals);
data = new PagedBytesAtomicFieldData(bytesReader, sizePointer, termOrdToBytesOffset, ordinals);
success = true;
return data;
} finally {
if (!success) {
// If something went wrong, unwind any current estimations we've made
estimator.afterLoad(termsEnum, 0);
} else {
// Call .afterLoad() to adjust the breaker now that we have an exact size
estimator.afterLoad(termsEnum, data.getMemorySizeInBytes());
}
builder.close();
}
}
/**
* Estimator that wraps string field data by either using
* BlockTreeTermsReader, or wrapping the data in a RamAccountingTermsEnum
* if the BlockTreeTermsReader cannot be used.
*/
public class PagedBytesEstimator implements PerValueEstimator {
private final AtomicReaderContext context;
private final MemoryCircuitBreaker breaker;
private long estimatedBytes;
PagedBytesEstimator(AtomicReaderContext context, MemoryCircuitBreaker breaker) {
this.breaker = breaker;
this.context = context;
}
/**
* @return the number of bytes for the term based on the length and ordinal overhead
*/
public long bytesPerValue(BytesRef term) {
long bytes = term.length;
// 64 bytes for miscellaneous overhead
bytes += 64;
// Seems to be about a 1.5x compression per term/ord, plus 1 for some wiggle room
bytes = (long) ((double) bytes / 1.5) + 1;
return bytes;
}
/**
* @return the estimate for loading the entire term set into field data, or 0 if unavailable
*/
public long estimateStringFieldData() {
try {
AtomicReader reader = context.reader();
Terms terms = reader.terms(getFieldNames().indexName());
Fields fields = reader.fields();
final Terms fieldTerms = fields.terms(getFieldNames().indexName());
if (fieldTerms instanceof BlockTreeTermsReader.FieldReader) {
final BlockTreeTermsReader.Stats stats = ((BlockTreeTermsReader.FieldReader) fieldTerms).computeStats();
long totalTermBytes = stats.totalTermBytes;
if (logger.isTraceEnabled()) {
logger.trace("totalTermBytes: {}, terms.size(): {}, terms.getSumDocFreq(): {}",
totalTermBytes, terms.size(), terms.getSumDocFreq());
}
long totalBytes = totalTermBytes + (2 * terms.size()) + (4 * terms.getSumDocFreq());
return totalBytes;
}
} catch (Exception e) {
logger.warn("Unable to estimate memory overhead", e);
}
return 0;
}
/**
* Determine whether the BlockTreeTermsReader.FieldReader can be used
* for estimating the field data, adding the estimate to the circuit
* breaker if it can, otherwise wrapping the terms in a
* RamAccountingTermsEnum to be estimated on a per-term basis.
*
* @param terms terms to be estimated
* @return A possibly wrapped TermsEnum for the terms
* @throws IOException
*/
public TermsEnum beforeLoad(Terms terms) throws IOException {
final float acceptableTransientOverheadRatio = fieldDataType.getSettings().getAsFloat(
FilterSettingFields.ACCEPTABLE_TRANSIENT_OVERHEAD_RATIO,
OrdinalsBuilder.DEFAULT_ACCEPTABLE_OVERHEAD_RATIO);
AtomicReader reader = context.reader();
// Check if one of the following is present:
// - The OrdinalsBuilder overhead has been tweaked away from the default
// - A field data filter is present
// - A regex filter is present
if (acceptableTransientOverheadRatio != OrdinalsBuilder.DEFAULT_ACCEPTABLE_OVERHEAD_RATIO ||
fieldDataType.getSettings().getAsDouble(FilterSettingFields.FREQUENCY_MIN, 0d) != 0d ||
fieldDataType.getSettings().getAsDouble(FilterSettingFields.FREQUENCY_MAX, 0d) != 0d ||
fieldDataType.getSettings().getAsDouble(FilterSettingFields.FREQUENCY_MIN_SEGMENT_SIZE, 0d) != 0d ||
fieldDataType.getSettings().get(FilterSettingFields.REGEX_PATTERN) != null) {
if (logger.isTraceEnabled()) {
logger.trace("Filter exists, can't circuit break normally, using RamAccountingTermsEnum");
}
return new RamAccountingTermsEnum(filter(terms, reader), breaker, this);
} else {
estimatedBytes = this.estimateStringFieldData();
// If we weren't able to estimate, wrap in the RamAccountingTermsEnum
if (estimatedBytes == 0) {
return new RamAccountingTermsEnum(filter(terms, reader), breaker, this);
}
breaker.addEstimateBytesAndMaybeBreak(estimatedBytes);
return filter(terms, reader);
}
}
/**
* Adjust the circuit breaker now that terms have been loaded, getting
* the actual used either from the parameter (if estimation worked for
* the entire set), or from the TermsEnum if it has been wrapped in a
* RamAccountingTermsEnum.
*
* @param termsEnum terms that were loaded
* @param actualUsed actual field data memory usage
*/
public void afterLoad(TermsEnum termsEnum, long actualUsed) {
if (termsEnum instanceof RamAccountingTermsEnum) {
estimatedBytes = ((RamAccountingTermsEnum) termsEnum).getTotalBytes();
}
breaker.addWithoutBreaking(-(estimatedBytes - actualUsed));
}
/**
* Adjust the breaker when no terms were actually loaded, but the field
* data takes up space regardless. For instance, when ordinals are
* used.
* @param actualUsed bytes actually used
*/
public void adjustForNoTerms(long actualUsed) {
breaker.addWithoutBreaking(actualUsed);
}
}
static final class FilterSettingFields {
static final String ACCEPTABLE_TRANSIENT_OVERHEAD_RATIO = "acceptable_transient_overhead_ratio";
static final String FREQUENCY_MIN = "filter.frequency.min";
static final String FREQUENCY_MAX = "filter.frequency.max";
static final String FREQUENCY_MIN_SEGMENT_SIZE = "filter.frequency.min_segment_size";
static final String REGEX_PATTERN = "filter.regex.pattern";
}
}

View File

@ -29,6 +29,8 @@ import org.elasticsearch.indices.analysis.IndicesAnalysisModule;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.query.IndicesQueriesModule;
@ -77,5 +79,7 @@ public class IndicesModule extends AbstractModule implements SpawnModules {
bind(IndicesTTLService.class).asEagerSingleton();
bind(IndicesWarmer.class).to(InternalIndicesWarmer.class).asEagerSingleton();
bind(UpdateHelper.class).asEagerSingleton();
bind(CircuitBreakerService.class).to(InternalCircuitBreakerService.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.fielddata.breaker;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
/**
* Interface for Circuit Breaker services, which provide breakers to classes
* that load field data.
*/
public interface CircuitBreakerService {
/**
* @return the breaker that can be used to register estimates against
*/
public MemoryCircuitBreaker getBreaker();
/**
* @return stats about the breaker
*/
public FieldDataBreakerStats stats();
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.fielddata.breaker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
/**
* Class encapsulating stats about the circuit breaker
*/
public class FieldDataBreakerStats implements Streamable, ToXContent {
private long maximum;
private long estimated;
private double overhead;
FieldDataBreakerStats() {
}
public FieldDataBreakerStats(long maximum, long estimated, double overhead) {
this.maximum = maximum;
this.estimated = estimated;
this.overhead = overhead;
}
public long getMaximum() {
return this.maximum;
}
public long getEstimated() {
return this.estimated;
}
public double getOverhead() {
return this.overhead;
}
public static FieldDataBreakerStats readOptionalCircuitBreakerStats(StreamInput in) throws IOException {
FieldDataBreakerStats stats = in.readOptionalStreamable(new FieldDataBreakerStats());
return stats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
maximum = in.readLong();
estimated = in.readLong();
overhead = in.readDouble();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(maximum);
out.writeLong(estimated);
out.writeDouble(overhead);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.BREAKER);
builder.field(Fields.MAX, maximum);
builder.field(Fields.MAX_HUMAN, new ByteSizeValue(maximum));
builder.field(Fields.ESTIMATED, estimated);
builder.field(Fields.ESTIMATED_HUMAN, new ByteSizeValue(estimated));
builder.field(Fields.OVERHEAD, overhead);
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString BREAKER = new XContentBuilderString("fielddata_breaker");
static final XContentBuilderString MAX = new XContentBuilderString("maximum_size_in_bytes");
static final XContentBuilderString MAX_HUMAN = new XContentBuilderString("maximum_size");
static final XContentBuilderString ESTIMATED = new XContentBuilderString("estimated_size_in_bytes");
static final XContentBuilderString ESTIMATED_HUMAN = new XContentBuilderString("estimated_size");
static final XContentBuilderString OVERHEAD = new XContentBuilderString("overhead");
}
}

View File

@ -0,0 +1,123 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.fielddata.breaker;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.node.settings.NodeSettingsService;
/**
* The InternalCircuitBreakerService handles providing
* {@link org.elasticsearch.common.breaker.MemoryCircuitBreaker}s
* that can be used to keep track of memory usage across the node, preventing
* actions that could cause an {@link OutOfMemoryError} on the node.
*/
public class InternalCircuitBreakerService extends AbstractLifecycleComponent<InternalCircuitBreakerService> implements CircuitBreakerService {
public static final String CIRCUIT_BREAKER_MAX_BYTES_SETTING = "indices.fielddata.breaker.limit";
public static final String CIRCUIT_BREAKER_OVERHEAD_SETTING = "indices.fielddata.breaker.overhead";
public static final double DEFAULT_OVERHEAD_CONSTANT = 1.03;
private volatile MemoryCircuitBreaker breaker;
private volatile long maxBytes;
private volatile double overhead;
@Inject
public InternalCircuitBreakerService(Settings settings, NodeSettingsService nodeSettingsService, IndicesFieldDataCache fieldDataCache) {
super(settings);
long fieldDataMax = fieldDataCache.computeSizeInBytes();
this.maxBytes = settings.getAsBytesSize(CIRCUIT_BREAKER_MAX_BYTES_SETTING, new ByteSizeValue(fieldDataMax)).bytes();
this.overhead = settings.getAsDouble(CIRCUIT_BREAKER_OVERHEAD_SETTING, DEFAULT_OVERHEAD_CONSTANT);
this.breaker = new MemoryCircuitBreaker(new ByteSizeValue(maxBytes), overhead, null, logger);
nodeSettingsService.addListener(new ApplySettings());
}
class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
// clear breaker now that settings have changed
ByteSizeValue newMaxByteSizeValue = settings.getAsBytesSize(CIRCUIT_BREAKER_MAX_BYTES_SETTING, null);
boolean breakerResetNeeded = false;
if (newMaxByteSizeValue != null) {
logger.info("updating [{}] from [{}] to [{}]", CIRCUIT_BREAKER_MAX_BYTES_SETTING,
new ByteSizeValue(InternalCircuitBreakerService.this.maxBytes), newMaxByteSizeValue);
InternalCircuitBreakerService.this.maxBytes = newMaxByteSizeValue.bytes();
breakerResetNeeded = true;
}
double newOverhead = settings.getAsDouble(CIRCUIT_BREAKER_OVERHEAD_SETTING, overhead);
if (newOverhead != overhead) {
logger.info("updating [{}] from [{}] to [{}]", CIRCUIT_BREAKER_OVERHEAD_SETTING,
overhead, newOverhead);
InternalCircuitBreakerService.this.overhead = newOverhead;
breakerResetNeeded = true;
}
if (breakerResetNeeded) {
resetBreaker();
}
}
}
/**
* @return a {@link org.elasticsearch.common.breaker.MemoryCircuitBreaker} that can be used for aggregating memory usage
*/
public MemoryCircuitBreaker getBreaker() {
return this.breaker;
}
/**
* Reset the breaker, creating a new one and initializing its used value
* to the actual field data usage, or the existing estimated usage if the
* actual value is not available. Will not trip the breaker even if the
* used value is higher than the limit for the breaker.
*/
public synchronized void resetBreaker() {
final MemoryCircuitBreaker oldBreaker = this.breaker;
// discard old breaker by creating a new one and pre-populating from the current breaker
this.breaker = new MemoryCircuitBreaker(new ByteSizeValue(maxBytes), overhead, oldBreaker, logger);
}
@Override
public FieldDataBreakerStats stats() {
return new FieldDataBreakerStats(breaker.getMaximum(), breaker.getUsed(), breaker.getOverhead());
}
@Override
protected void doStart() throws ElasticSearchException {
}
@Override
protected void doStop() throws ElasticSearchException {
}
@Override
protected void doClose() throws ElasticSearchException {
}
}

View File

@ -23,16 +23,14 @@ import com.google.common.cache.*;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.SegmentReader;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.AtomicFieldData;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.*;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
@ -47,6 +45,8 @@ import java.util.concurrent.TimeUnit;
*/
public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener<IndicesFieldDataCache.Key, AtomicFieldData> {
private static final long JVM_HEAP_MAX_BYTES = JvmInfo.jvmInfo().getMem().getHeapMax().bytes();
Cache<Key, AtomicFieldData> cache;
private volatile String size;
@ -59,7 +59,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
super(settings);
this.size = componentSettings.get("size", "-1");
this.expire = componentSettings.getAsTime("expire", null);
computeSizeInBytes();
this.sizeInBytes = computeSizeInBytes();
buildCache();
}
@ -78,14 +78,17 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
cache = cacheBuilder.build();
}
private void computeSizeInBytes() {
/**
* @return the maximum configured size for the field data cache, in bytes, or -1 if not set
*/
public long computeSizeInBytes() {
if (size.equals("-1")) {
sizeInBytes = -1;
return -1;
} else if (size.endsWith("%")) {
double percent = Double.parseDouble(size.substring(0, size.length() - 1));
sizeInBytes = (long) ((percent / 100) * JvmInfo.jvmInfo().getMem().getHeapMax().bytes());
return (long) ((percent / 100) * JVM_HEAP_MAX_BYTES);
} else {
sizeInBytes = ByteSizeValue.parseBytesSizeValue(size).bytes();
return ByteSizeValue.parseBytesSizeValue(size).bytes();
}
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.http.HttpServer;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -55,6 +56,8 @@ public class NodeService extends AbstractComponent {
private final PluginsService pluginService;
private final CircuitBreakerService circuitBreakerService;
@Nullable
private HttpServer httpServer;
@ -70,7 +73,7 @@ public class NodeService extends AbstractComponent {
@Inject
public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery,
TransportService transportService, IndicesService indicesService,
PluginsService pluginService, Version version) {
PluginsService pluginService, CircuitBreakerService circuitBreakerService, Version version) {
super(settings);
this.threadPool = threadPool;
this.monitorService = monitorService;
@ -84,6 +87,7 @@ public class NodeService extends AbstractComponent {
}
this.version = version;
this.pluginService = pluginService;
this.circuitBreakerService = circuitBreakerService;
}
public void setHttpServer(@Nullable HttpServer httpServer) {
@ -156,11 +160,13 @@ public class NodeService extends AbstractComponent {
monitorService.networkService().stats(),
monitorService.fsService().stats(),
transportService.stats(),
httpServer == null ? null : httpServer.stats()
httpServer == null ? null : httpServer.stats(),
circuitBreakerService.stats()
);
}
public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool, boolean network, boolean fs, boolean transport, boolean http) {
public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool, boolean network,
boolean fs, boolean transport, boolean http, boolean circuitBreaker) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
return new NodeStats(disovery.localNode(), System.currentTimeMillis(), hostname,
@ -172,7 +178,8 @@ public class NodeService extends AbstractComponent {
network ? monitorService.networkService().stats() : null,
fs ? monitorService.fsService().stats() : null,
transport ? transportService.stats() : null,
http ? (httpServer == null ? null : httpServer.stats()) : null
http ? (httpServer == null ? null : httpServer.stats()) : null,
circuitBreaker ? circuitBreakerService.stats() : null
);
}
}

View File

@ -116,6 +116,12 @@ public class RestNodesStatsAction extends BaseRestHandler {
controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/stats/http", httpHandler);
controller.registerHandler(RestRequest.Method.GET, "/_nodes/http/stats", httpHandler);
controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/http/stats", httpHandler);
RestBreakerHandler breakerHandler = new RestBreakerHandler();
controller.registerHandler(RestRequest.Method.GET, "/_nodes/stats/breaker", breakerHandler);
controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/stats/breaker", breakerHandler);
controller.registerHandler(RestRequest.Method.GET, "/_nodes/breaker/stats", breakerHandler);
controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/breaker/stats", breakerHandler);
}
@Override
@ -141,6 +147,7 @@ public class RestNodesStatsAction extends BaseRestHandler {
nodesStatsRequest.fs(request.paramAsBoolean("fs", nodesStatsRequest.fs()));
nodesStatsRequest.transport(request.paramAsBoolean("transport", nodesStatsRequest.transport()));
nodesStatsRequest.http(request.paramAsBoolean("http", nodesStatsRequest.http()));
nodesStatsRequest.breaker(request.paramAsBoolean("breaker", nodesStatsRequest.breaker()));
executeNodeStats(request, channel, nodesStatsRequest);
}
@ -264,4 +271,13 @@ public class RestNodesStatsAction extends BaseRestHandler {
executeNodeStats(request, channel, nodesStatsRequest);
}
}
class RestBreakerHandler implements RestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(Strings.splitStringByCommaToArray(request.param("nodeId")));
nodesStatsRequest.clear().breaker(true);
executeNodeStats(request, channel, nodesStatsRequest);
}
}
}

View File

@ -24,6 +24,8 @@ import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.index.query.functionscore.FunctionScoreModule;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.aggregations.AggregationModule;
import org.elasticsearch.search.controller.SearchPhaseController;

View File

@ -37,6 +37,7 @@ import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.ContentPath;
import org.elasticsearch.index.mapper.Mapper.BuilderContext;
import org.elasticsearch.index.mapper.core.LongFieldMapper;
import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService;
import java.util.Random;
@ -135,7 +136,7 @@ public class LongFieldDataBenchmark {
indexWriter.close();
final DirectoryReader dr = DirectoryReader.open(dir);
final IndexFieldDataService fds = new IndexFieldDataService(new Index("dummy"));
final IndexFieldDataService fds = new IndexFieldDataService(new Index("dummy"), new DummyCircuitBreakerService());
final LongFieldMapper mapper = new LongFieldMapper.Builder(fieldName).build(new BuilderContext(null, new ContentPath(1)));
final IndexNumericFieldData<AtomicNumericFieldData> fd = fds.getForField(mapper);
final long start = System.nanoTime();

View File

@ -0,0 +1,108 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.breaker;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
/**
* Tests for the Memory Aggregating Circuit Breaker
*/
public class MemoryCircuitBreakerTests extends ElasticsearchTestCase {
@Test
public void testThreadedUpdatesToBreaker() throws Exception {
final int NUM_THREADS = 5;
final int BYTES_PER_THREAD = 1000;
final Thread[] threads = new Thread[NUM_THREADS];
final AtomicBoolean tripped = new AtomicBoolean(false);
final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(null);
final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue((BYTES_PER_THREAD * NUM_THREADS) - 1), 1.0, logger);
for (int i = 0; i < NUM_THREADS; i++) {
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < BYTES_PER_THREAD; j++) {
try {
breaker.addEstimateBytesAndMaybeBreak(1L);
} catch (CircuitBreakingException e) {
if (tripped.get()) {
assertThat("tripped too many times", true, equalTo(false));
} else {
assert tripped.compareAndSet(false, true);
}
} catch (Throwable e2) {
lastException.set(e2);
}
}
}
});
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
assertThat("no other exceptions were thrown", lastException.get(), equalTo(null));
assertThat("breaker was tripped exactly once", tripped.get(), equalTo(true));
}
@Test
public void testConstantFactor() throws Exception {
final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue(15), 1.6, logger);
// add only 7 bytes
breaker.addWithoutBreaking(7);
try {
// this won't actually add it because it trips the breaker
breaker.addEstimateBytesAndMaybeBreak(3);
fail("should never reach this");
} catch (CircuitBreakingException cbe) {
assert true;
}
// shouldn't throw an exception
breaker.addEstimateBytesAndMaybeBreak(2);
assertThat(breaker.getUsed(), equalTo(9L));
// adding 3 more bytes (now at 12)
breaker.addWithoutBreaking(3);
try {
// Adding no bytes still breaks
breaker.addEstimateBytesAndMaybeBreak(0);
fail("should never reach this");
} catch (CircuitBreakingException cbe) {
assert true;
}
}
}

View File

@ -46,6 +46,8 @@ import org.elasticsearch.index.similarity.SimilarityModule;
import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
@ -83,6 +85,7 @@ public class IndexAliasesServiceTests extends ElasticsearchTestCase {
@Override
protected void configure() {
bind(ClusterService.class).toProvider(Providers.of((ClusterService) null));
bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class);
}
}
).createInjector();

View File

@ -34,6 +34,7 @@ import org.apache.lucene.codecs.memory.MemoryPostingsFormat;
import org.apache.lucene.codecs.perfield.PerFieldPostingsFormat;
import org.apache.lucene.codecs.pulsing.Pulsing41PostingsFormat;
import org.apache.lucene.codecs.simpletext.SimpleTextCodec;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -52,6 +53,8 @@ import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.index.similarity.SimilarityModule;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService;
import org.elasticsearch.test.ElasticsearchLuceneTestCase;
import org.junit.Before;
import org.junit.Test;
@ -414,6 +417,12 @@ public class CodecTests extends ElasticsearchLuceneTestCase {
.add(new CodecModule(settings))
.add(new MapperServiceModule())
.add(new AnalysisModule(settings))
.add(new AbstractModule() {
@Override
protected void configure() {
bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class);
}
})
.createInjector();
return injector.getInstance(CodecService.class);
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.index.mapper.ContentPath;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.Mapper.BuilderContext;
import org.elasticsearch.index.mapper.MapperBuilders;
import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.After;
import org.junit.Before;
@ -76,7 +77,7 @@ public abstract class AbstractFieldDataTests extends ElasticsearchTestCase {
@Before
public void setup() throws Exception {
ifdService = new IndexFieldDataService(new Index("test"));
ifdService = new IndexFieldDataService(new Index("test"), new DummyCircuitBreakerService());
// LogByteSizeMP to preserve doc ID order
writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(Lucene.VERSION, new StandardAnalyzer(Lucene.VERSION)).setMergePolicy(new LogByteSizeMergePolicy()));
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.Mapper.BuilderContext;
import org.elasticsearch.index.mapper.MapperBuilders;
import org.elasticsearch.index.mapper.core.*;
import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService;
import org.elasticsearch.test.ElasticsearchTestCase;
import java.util.Arrays;
@ -49,7 +50,7 @@ public class IndexFieldDataServiceTests extends ElasticsearchTestCase {
@SuppressWarnings("unchecked")
public void testGetForFieldDefaults() {
final IndexFieldDataService ifdService = new IndexFieldDataService(new Index("test"));
final IndexFieldDataService ifdService = new IndexFieldDataService(new Index("test"), new DummyCircuitBreakerService());
for (boolean docValues : Arrays.asList(true, false)) {
final BuilderContext ctx = new BuilderContext(null, new ContentPath(1));
final StringFieldMapper stringMapper = new StringFieldMapper.Builder("string").tokenized(false).fieldDataSettings(docValues ? DOC_VALUES_SETTINGS : ImmutableSettings.EMPTY).build(ctx);
@ -98,7 +99,7 @@ public class IndexFieldDataServiceTests extends ElasticsearchTestCase {
@SuppressWarnings("unchecked")
public void testByPassDocValues() {
final IndexFieldDataService ifdService = new IndexFieldDataService(new Index("test"));
final IndexFieldDataService ifdService = new IndexFieldDataService(new Index("test"), new DummyCircuitBreakerService());
final BuilderContext ctx = new BuilderContext(null, new ContentPath(1));
final StringFieldMapper stringMapper = MapperBuilders.stringField("string").tokenized(false).fieldDataSettings(DOC_VALUES_SETTINGS).fieldDataSettings(ImmutableSettings.builder().put("format", "fst").build()).build(ctx);
ifdService.clear();
@ -129,7 +130,7 @@ public class IndexFieldDataServiceTests extends ElasticsearchTestCase {
}
public void testChangeFieldDataFormat() throws Exception {
final IndexFieldDataService ifdService = new IndexFieldDataService(new Index("test"));
final IndexFieldDataService ifdService = new IndexFieldDataService(new Index("test"), new DummyCircuitBreakerService());
final BuilderContext ctx = new BuilderContext(null, new ContentPath(1));
final StringFieldMapper mapper1 = MapperBuilders.stringField("s").tokenized(false).fieldDataSettings(ImmutableSettings.builder().put(FieldDataType.FORMAT_KEY, "paged_bytes").build()).build(ctx);
final IndexWriter writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(TEST_VERSION_CURRENT, new KeywordAnalyzer()));

View File

@ -37,6 +37,7 @@ import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.index.similarity.SimilarityLookupService;
import org.elasticsearch.indices.analysis.IndicesAnalysisModule;
import org.elasticsearch.indices.analysis.IndicesAnalysisService;
import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService;
/**
*
@ -58,7 +59,7 @@ public class MapperTestUtils {
}
public static MapperService newMapperService(Index index, Settings indexSettings) {
return new MapperService(index, indexSettings, new Environment(), newAnalysisService(), new IndexFieldDataService(index),
return new MapperService(index, indexSettings, new Environment(), newAnalysisService(), new IndexFieldDataService(index, new DummyCircuitBreakerService()),
new PostingsFormatService(index), new DocValuesFormatService(index), newSimilarityLookupService());
}

View File

@ -61,6 +61,8 @@ import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.index.similarity.SimilarityModule;
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
@ -118,6 +120,7 @@ public class SimpleIndexQueryParserTests extends ElasticsearchTestCase {
@Override
protected void configure() {
bind(ClusterService.class).toProvider(Providers.of((ClusterService) null));
bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class);
}
}
).createInjector();

View File

@ -40,6 +40,8 @@ import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.index.similarity.SimilarityModule;
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
@ -84,6 +86,7 @@ public class IndexQueryParserModuleTests extends ElasticsearchTestCase {
@Override
protected void configure() {
bind(ClusterService.class).toProvider(Providers.of((ClusterService) null));
bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class);
}
}
).createInjector();

View File

@ -41,6 +41,8 @@ import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.index.similarity.SimilarityModule;
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
@ -82,6 +84,7 @@ public class IndexQueryParserPlugin2Tests extends ElasticsearchTestCase {
@Override
protected void configure() {
bind(ClusterService.class).toProvider(Providers.of((ClusterService) null));
bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class);
}
}
).createInjector();

View File

@ -41,12 +41,13 @@ import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.index.similarity.SimilarityModule;
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.junit.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
/**
@ -91,6 +92,7 @@ public class IndexQueryParserPluginTests extends ElasticsearchTestCase {
@Override
protected void configure() {
bind(ClusterService.class).toProvider(Providers.of((ClusterService) null));
bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class);
}
}
).createInjector();

View File

@ -40,6 +40,7 @@ import org.elasticsearch.index.mapper.core.DoubleFieldMapper;
import org.elasticsearch.index.mapper.core.LongFieldMapper;
import org.elasticsearch.index.mapper.core.NumberFieldMapper;
import org.elasticsearch.index.mapper.core.StringFieldMapper;
import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.After;
@ -78,7 +79,7 @@ public class FieldDataTermsFilterTests extends ElasticsearchTestCase {
.build(new Mapper.BuilderContext(null, new ContentPath(1)));
// create index and fielddata service
ifdService = new IndexFieldDataService(new Index("test"));
ifdService = new IndexFieldDataService(new Index("test"), new DummyCircuitBreakerService());
writer = new IndexWriter(new RAMDirectory(),
new IndexWriterConfig(Lucene.VERSION, new StandardAnalyzer(Lucene.VERSION)));

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.similarity;
import org.apache.lucene.search.similarities.*;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -33,6 +34,8 @@ import org.elasticsearch.index.codec.CodecModule;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperServiceModule;
import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.breaker.DummyCircuitBreakerService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
@ -40,7 +43,6 @@ import java.io.IOException;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
public class SimilarityTests extends ElasticsearchTestCase {
@ -163,6 +165,12 @@ public class SimilarityTests extends ElasticsearchTestCase {
.add(new MapperServiceModule())
.add(new AnalysisModule(settings))
.add(new SimilarityModule(settings))
.add(new AbstractModule() {
@Override
protected void configure() {
bind(CircuitBreakerService.class).to(DummyCircuitBreakerService.class);
}
})
.createInjector();
return injector.getInstance(SimilarityService.class);
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.fielddata.breaker;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
/**
* Integration tests for InternalCircuitBreakerService
*/
public class CircuitBreakerServiceTests extends ElasticsearchIntegrationTest {
@Test
@TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE")
public void testMemoryBreaker() {
assertAcked(prepareCreate("cb-test", 1));
final Client client = client();
try {
// index some different terms so we have some field data for loading
int docCount = atLeast(300);
for (long id = 0; id < docCount; id++) {
client.prepareIndex("cb-test", "type", Long.toString(id))
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
}
// refresh
refresh();
// execute a search that loads field data (sorting on the "test" field)
client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}")
.execute().actionGet();
// clear field data cache (thus setting the loaded field data back to 0)
client.admin().indices().prepareClearCache("cb-test").setFieldDataCache(true).execute().actionGet();
// Update circuit breaker settings
Settings settings = settingsBuilder()
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "100b")
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05)
.build();
client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet();
// execute a search that loads field data (sorting on the "test" field)
// again, this time it should trip the breaker
try {
client.prepareSearch("cb-test").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}")
.execute().actionGet();
fail("should not reach this point");
} catch (SearchPhaseExecutionException e) {
assert true;
}
} finally {
// Reset settings
Settings resetSettings = settingsBuilder()
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "-1")
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, InternalCircuitBreakerService.DEFAULT_OVERHEAD_CONSTANT)
.build();
client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
}
}
@Test
@TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE")
public void testRamAccountingTermsEnum() {
final Client client = client();
try {
// Create an index where the mappings have a field data filter
client.admin().indices().prepareCreate("ramtest").setSource("{\"mappings\": {\"type\": {\"properties\": {\"test\": " +
"{\"type\": \"string\",\"fielddata\": {\"filter\": {\"regex\": {\"pattern\": \"^value.*\"}}}}}}}}").execute().actionGet();
// Wait 10 seconds for green
client.admin().cluster().prepareHealth("ramtest").setWaitForGreenStatus().setTimeout("10s").execute().actionGet();
// index some different terms so we have some field data for loading
int docCount = atLeast(300);
for (long id = 0; id < docCount; id++) {
client.prepareIndex("ramtest", "type", Long.toString(id))
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
}
// refresh
refresh();
// execute a search that loads field data (sorting on the "test" field)
client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}")
.execute().actionGet();
// clear field data cache (thus setting the loaded field data back to 0)
client.admin().indices().prepareClearCache("ramtest").setFieldDataCache(true).execute().actionGet();
// Update circuit breaker settings
Settings settings = settingsBuilder()
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "100b")
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, 1.05)
.build();
client.admin().cluster().prepareUpdateSettings().setTransientSettings(settings).execute().actionGet();
// execute a search that loads field data (sorting on the "test" field)
// again, this time it should trip the breaker
try {
client.prepareSearch("ramtest").setSource("{\"sort\": \"test\",\"query\":{\"match_all\":{}}}")
.execute().actionGet();
fail("should not reach this point");
} catch (SearchPhaseExecutionException e) {
assert true;
}
} finally {
// Reset settings
Settings resetSettings = settingsBuilder()
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, "-1")
.put(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, InternalCircuitBreakerService.DEFAULT_OVERHEAD_CONSTANT)
.build();
client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings).execute().actionGet();
}
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.fielddata.breaker;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.ByteSizeValue;
/**
* Class that returns a breaker that never breaks
*/
public class DummyCircuitBreakerService implements CircuitBreakerService {
private final ESLogger logger = Loggers.getLogger(DummyCircuitBreakerService.class);
private final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker(new ByteSizeValue(Long.MAX_VALUE), 0.0, logger);
public DummyCircuitBreakerService() {}
@Override
public MemoryCircuitBreaker getBreaker() {
return breaker;
}
@Override
public FieldDataBreakerStats stats() {
return new FieldDataBreakerStats(-1, -1, 0);
}
}

View File

@ -0,0 +1,235 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.fielddata.breaker;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.util.English;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.engine.MockRobinEngine;
import org.elasticsearch.test.engine.ThrowingAtomicReaderWrapper;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.store.MockDirectoryHelper;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
/**
* Tests for the circuit breaker while random exceptions are happening
*/
public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegrationTest {
@Test
@TestLogging("org.elasticsearch.indices.fielddata.breaker:TRACE,org.elasticsearch.index.fielddata:TRACE,org.elasticsearch.common.breaker:TRACE")
public void testBreakerWithRandomExceptions() throws IOException, InterruptedException, ExecutionException {
final int numShards = between(1, 5);
String mapping = XContentFactory.jsonBuilder()
.startObject()
.startObject("type")
.startObject("properties")
.startObject("test-str")
.field("type", "string")
.field("index", "not_analyzed")
.startObject("fielddata")
.field("format", randomBytesFieldDataFormat())
.endObject() // fielddata
.endObject() // test-str
.startObject("test-num")
// I don't use randomNumericType() here because I don't want "byte", and I want "float" and "double"
.field("type", randomFrom(Arrays.asList("float", "long", "double", "short", "integer")))
.startObject("fielddata")
.field("format", randomNumericFieldDataFormat())
.endObject() // fielddata
.endObject() // test-num
.endObject() // properties
.endObject() // type
.endObject() // {}
.string();
final double exceptionRate;
final double exceptionOnOpenRate;
if (frequently()) {
if (randomBoolean()) {
if (randomBoolean()) {
exceptionOnOpenRate = 1.0/between(5, 100);
exceptionRate = 0.0d;
} else {
exceptionRate = 1.0/between(5, 100);
exceptionOnOpenRate = 0.0d;
}
} else {
exceptionOnOpenRate = 1.0/between(5, 100);
exceptionRate = 1.0/between(5, 100);
}
} else {
// rarely no exception
exceptionRate = 0d;
exceptionOnOpenRate = 0d;
}
ImmutableSettings.Builder settings = settingsBuilder()
.put("index.number_of_shards", numShards)
.put("index.number_of_replicas", randomIntBetween(0, 1))
.put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE, exceptionRate)
.put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE_ON_OPEN, exceptionOnOpenRate)
.put(MockDirectoryHelper.CHECK_INDEX_ON_CLOSE, true);
logger.info("creating index: [test] using settings: [{}]", settings.build().getAsMap());
client().admin().indices().prepareCreate("test")
.setSettings(settings)
.addMapping("type", mapping).execute().actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin().cluster()
.health(Requests.clusterHealthRequest().waitForYellowStatus().timeout(TimeValue.timeValueSeconds(5))).get(); // it's OK to timeout here
final int numDocs;
if (clusterHealthResponse.isTimedOut()) {
/* some seeds just won't let you create the index at all and we enter a ping-pong mode
* trying one node after another etc. that is ok but we need to make sure we don't wait
* forever when indexing documents so we set numDocs = 1 and expect all shards to fail
* when we search below.*/
logger.info("ClusterHealth timed out - only index one doc and expect searches to fail");
numDocs = 1;
} else {
numDocs = between(10, 100);
}
for (int i = 0; i < numDocs ; i++) {
try {
client().prepareIndex("test", "type", "" + i)
.setTimeout(TimeValue.timeValueSeconds(1)).setSource("test-str", randomUnicodeOfLengthBetween(5, 25), "test-num", i).get();
} catch (ElasticSearchException ex) {
}
}
logger.info("Start Refresh");
RefreshResponse refreshResponse = client().admin().indices().prepareRefresh("test").execute().get(); // don't assert on failures here
final boolean refreshFailed = refreshResponse.getShardFailures().length != 0 || refreshResponse.getFailedShards() != 0;
logger.info("Refresh failed: [{}] numShardsFailed: [{}], shardFailuresLength: [{}], successfulShards: [{}], totalShards: [{}] ",
refreshFailed, refreshResponse.getFailedShards(), refreshResponse.getShardFailures().length,
refreshResponse.getSuccessfulShards(), refreshResponse.getTotalShards());
final int numSearches = atLeast(10);
for (int i = 0; i < numSearches; i++) {
try {
// Sort by the string and numeric fields, to load them into field data
client().prepareSearch().setQuery(QueryBuilders.matchAllQuery())
.addSort("test-str", SortOrder.ASC)
.addSort("test-num", SortOrder.ASC).get();
} catch (SearchPhaseExecutionException ex) {
logger.info("expected SearchPhaseException: [{}]", ex.getMessage());
}
if (frequently()) {
// Now, clear the cache and check that the circuit breaker has been
// successfully set back to zero. If there is a bug in the circuit
// breaker adjustment code, it should show up here by the breaker
// estimate being either positive or negative.
client().admin().indices().prepareClearCache("test").setFieldDataCache(true).execute().actionGet();
NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().all().execute().actionGet();
for (NodeStats stats : resp.getNodes()) {
assertThat("Breaker reset to 0", stats.getBreaker().getEstimated(), equalTo(0L));
}
}
}
}
public static final String EXCEPTION_TOP_LEVEL_RATIO_KEY = "index.engine.exception.ratio.top";
public static final String EXCEPTION_LOW_LEVEL_RATIO_KEY = "index.engine.exception.ratio.low";
// TODO: Generalize this class and add it as a utility
public static class RandomExceptionDirectoryReaderWrapper extends MockRobinEngine.DirectoryReaderWrapper {
private final Settings settings;
static class ThrowingSubReaderWrapper extends SubReaderWrapper implements ThrowingAtomicReaderWrapper.Thrower {
private final Random random;
private final double topLevelRatio;
private final double lowLevelRatio;
ThrowingSubReaderWrapper(Settings settings) {
final long seed = settings.getAsLong(ElasticsearchIntegrationTest.INDEX_SEED_SETTING, 0l);
this.topLevelRatio = settings.getAsDouble(EXCEPTION_TOP_LEVEL_RATIO_KEY, 0.1d);
this.lowLevelRatio = settings.getAsDouble(EXCEPTION_LOW_LEVEL_RATIO_KEY, 0.1d);
this.random = new Random(seed);
}
@Override
public AtomicReader wrap(AtomicReader reader) {
return new ThrowingAtomicReaderWrapper(reader, this);
}
@Override
public void maybeThrow(ThrowingAtomicReaderWrapper.Flags flag) throws IOException {
switch (flag) {
case Fields:
break;
case TermVectors:
break;
case Terms:
case TermsEnum:
if (random.nextDouble() < topLevelRatio) {
throw new IOException("Forced top level Exception on [" + flag.name() + "]");
}
case Intersect:
break;
case Norms:
break;
case NumericDocValues:
break;
case BinaryDocValues:
break;
case SortedDocValues:
break;
case SortedSetDocValues:
break;
case DocsEnum:
case DocsAndPositionsEnum:
if (random.nextDouble() < lowLevelRatio) {
throw new IOException("Forced low level Exception on [" + flag.name() + "]");
}
break;
}
}
}
public RandomExceptionDirectoryReaderWrapper(DirectoryReader in, Settings settings) {
super(in, new ThrowingSubReaderWrapper(settings));
this.settings = settings;
}
@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) {
return new RandomExceptionDirectoryReaderWrapper(in, settings);
}
}
}

View File

@ -52,15 +52,11 @@ import static org.hamcrest.Matchers.*;
*/
public class GeoDistanceTests extends ElasticsearchIntegrationTest {
private static String randomFieldDataFormat() {
return randomFrom(Arrays.asList("array", "compressed", "doc_values"));
}
@Test
public void simpleDistanceTests() throws Exception {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1")
.startObject("properties").startObject("location").field("type", "geo_point").field("lat_lon", true)
.startObject("fielddata").field("format", randomFieldDataFormat()).endObject().endObject().endObject()
.startObject("fielddata").field("format", randomNumericFieldDataFormat()).endObject().endObject().endObject()
.endObject().endObject().string();
client().admin().indices().prepareCreate("test").addMapping("type1", mapping).execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
@ -212,7 +208,7 @@ public class GeoDistanceTests extends ElasticsearchIntegrationTest {
public void testDistanceSortingMVFields() throws Exception {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1")
.startObject("properties").startObject("locations").field("type", "geo_point").field("lat_lon", true)
.startObject("fielddata").field("format", randomFieldDataFormat()).endObject().endObject().endObject()
.startObject("fielddata").field("format", randomNumericFieldDataFormat()).endObject().endObject().endObject()
.endObject().endObject().string();
client().admin().indices().prepareCreate("test")
@ -344,7 +340,7 @@ public class GeoDistanceTests extends ElasticsearchIntegrationTest {
public void testDistanceSortingWithMissingGeoPoint() throws Exception {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1")
.startObject("properties").startObject("locations").field("type", "geo_point").field("lat_lon", true)
.startObject("fielddata").field("format", randomFieldDataFormat()).endObject().endObject().endObject()
.startObject("fielddata").field("format", randomNumericFieldDataFormat()).endObject().endObject().endObject()
.endObject().endObject().string();
client().admin().indices().prepareCreate("test")
@ -446,7 +442,7 @@ public class GeoDistanceTests extends ElasticsearchIntegrationTest {
.startObject("properties")
.startObject("name").field("type", "string").endObject()
.startObject("location").field("type", "geo_point").field("lat_lon", true)
.startObject("fielddata").field("format", randomFieldDataFormat()).endObject().endObject()
.startObject("fielddata").field("format", randomNumericFieldDataFormat()).endObject().endObject()
.endObject()
.endObject()
.endObject()
@ -621,7 +617,7 @@ public class GeoDistanceTests extends ElasticsearchIntegrationTest {
.field("geohash_precision", 24)
.field("lat_lon", true)
.startObject("fielddata")
.field("format", randomFieldDataFormat())
.field("format", randomNumericFieldDataFormat())
.endObject()
.endObject()
.endObject()

View File

@ -880,4 +880,20 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
return perTestRatio;
}
/**
* Returns a random numeric field data format from the choices of "array",
* "compressed", or "doc_values".
*/
public static String randomNumericFieldDataFormat() {
return randomFrom(Arrays.asList("array", "compressed", "doc_values"));
}
/**
* Returns a random bytes field data format from the choices of
* "paged_bytes", "fst", or "doc_values".
*/
public static String randomBytesFieldDataFormat() {
return randomFrom(Arrays.asList("paged_bytes", "fst", "doc_values"));
}
}