[CACHE] Expose concurrency_level on all caches

The concurrency level allows to configure the cache internal segments
used to cache data. This can have direct impact on evicition rates since
memory bound caches are equally divided into segments which can cause
early evictions if cache entries are not well balanced.

Relates to #7836
This commit is contained in:
Simon Willnauer 2014-10-16 16:34:44 +02:00
parent 347ce36654
commit 4a14c635c8
5 changed files with 47 additions and 7 deletions

View File

@ -66,6 +66,7 @@ public class ClusterDynamicSettingsModule extends AbstractModule {
clusterDynamicSettings.addDynamicSetting(FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP + "*");
clusterDynamicSettings.addDynamicSetting(IndicesFilterCache.INDICES_CACHE_FILTER_SIZE);
clusterDynamicSettings.addDynamicSetting(IndicesFilterCache.INDICES_CACHE_FILTER_EXPIRE, Validator.TIME);
clusterDynamicSettings.addDynamicSetting(IndicesFilterCache.INDICES_CACHE_FILTER_CONCURRENCY_LEVEL, Validator.POSITIVE_INTEGER);
clusterDynamicSettings.addDynamicSetting(IndicesStore.INDICES_STORE_THROTTLE_TYPE);
clusterDynamicSettings.addDynamicSetting(IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
clusterDynamicSettings.addDynamicSetting(IndicesTTLService.INDICES_TTL_INTERVAL, Validator.TIME);

View File

@ -26,6 +26,7 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.lucene.search.DocIdSet;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -51,6 +52,7 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList
private volatile String size;
private volatile long sizeInBytes;
private volatile TimeValue expire;
private volatile int concurrencyLevel;
private final TimeValue cleanInterval;
@ -61,6 +63,7 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList
public static final String INDICES_CACHE_FILTER_SIZE = "indices.cache.filter.size";
public static final String INDICES_CACHE_FILTER_EXPIRE = "indices.cache.filter.expire";
public static final String INDICES_CACHE_FILTER_CONCURRENCY_LEVEL = "indices.cache.filter.concurrency_level";
class ApplySettings implements NodeSettingsService.Listener {
@Override
@ -78,6 +81,15 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList
IndicesFilterCache.this.expire = expire;
replace = true;
}
final int concurrencyLevel = settings.getAsInt(INDICES_CACHE_FILTER_CONCURRENCY_LEVEL, IndicesFilterCache.this.concurrencyLevel);
if (concurrencyLevel <= 0) {
throw new ElasticsearchIllegalArgumentException("concurrency_level must be > 0 but was: " + concurrencyLevel);
}
if (!Objects.equal(concurrencyLevel, IndicesFilterCache.this.concurrencyLevel)) {
logger.info("updating [indices.cache.filter.concurrency_level] from [{}] to [{}]", IndicesFilterCache.this.concurrencyLevel, concurrencyLevel);
IndicesFilterCache.this.concurrencyLevel = concurrencyLevel;
replace = true;
}
if (replace) {
Cache<WeightedFilterCache.FilterCacheKey, DocIdSet> oldCache = IndicesFilterCache.this.cache;
computeSizeInBytes();
@ -94,6 +106,10 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList
this.size = componentSettings.get("size", "10%");
this.expire = componentSettings.getAsTime("expire", null);
this.cleanInterval = componentSettings.getAsTime("clean_interval", TimeValue.timeValueSeconds(60));
this.concurrencyLevel = settings.getAsInt(INDICES_CACHE_FILTER_CONCURRENCY_LEVEL, 16);
if (concurrencyLevel <= 0) {
throw new ElasticsearchIllegalArgumentException("concurrency_level must be > 0 but was: " + concurrencyLevel);
}
computeSizeInBytes();
buildCache();
logger.debug("using [node] weighted filter cache with size [{}], actual_size [{}], expire [{}], clean_interval [{}]",

View File

@ -26,6 +26,7 @@ import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchType;
@ -85,6 +86,7 @@ public class IndicesQueryCache extends AbstractComponent implements RemovalListe
public static final String INDICES_CACHE_QUERY_SIZE = "indices.cache.query.size";
public static final String INDICES_CACHE_QUERY_EXPIRE = "indices.cache.query.expire";
public static final String INDICES_CACHE_QUERY_CONCURRENCY_LEVEL = "indices.cache.query.concurrency_level";
private final ThreadPool threadPool;
private final ClusterService clusterService;
@ -95,9 +97,12 @@ public class IndicesQueryCache extends AbstractComponent implements RemovalListe
final ConcurrentMap<CleanupKey, Boolean> registeredClosedListeners = ConcurrentCollections.newConcurrentMap();
final Set<CleanupKey> keysToClean = ConcurrentCollections.newConcurrentSet();
//TODO make these changes configurable on the cluster level
private volatile String size;
private volatile TimeValue expire;
private final String size;
private final TimeValue expire;
private final int concurrencyLevel;
private volatile Cache<Key, BytesReference> cache;
@Inject
@ -109,6 +114,11 @@ public class IndicesQueryCache extends AbstractComponent implements RemovalListe
// this cache can be very small yet still be very effective
this.size = settings.get(INDICES_CACHE_QUERY_SIZE, "1%");
this.expire = settings.getAsTime(INDICES_CACHE_QUERY_EXPIRE, null);
// defaults to 4, but this is a busy map for all indices, increase it a bit by default
this.concurrencyLevel = settings.getAsInt(INDICES_CACHE_QUERY_CONCURRENCY_LEVEL, 16);
if (concurrencyLevel <= 0) {
throw new ElasticsearchIllegalArgumentException("concurrency_level must be > 0 but was: " + concurrencyLevel);
}
buildCache();
this.reaper = new Reaper();
@ -120,9 +130,7 @@ public class IndicesQueryCache extends AbstractComponent implements RemovalListe
CacheBuilder<Key, BytesReference> cacheBuilder = CacheBuilder.newBuilder()
.maximumWeight(sizeInBytes).weigher(new QueryCacheWeigher()).removalListener(this);
// defaults to 4, but this is a busy map for all indices, increase it a bit
cacheBuilder.concurrencyLevel(16);
cacheBuilder.concurrencyLevel(concurrencyLevel);
if (expire != null) {
cacheBuilder.expireAfterAccess(expire.millis(), TimeUnit.MILLISECONDS);

View File

@ -24,6 +24,7 @@ import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
@ -53,6 +54,8 @@ import java.util.concurrent.TimeUnit;
public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener<IndicesFieldDataCache.Key, Accountable> {
public static final String FIELDDATA_CLEAN_INTERVAL_SETTING = "indices.fielddata.cache.cleanup_interval";
public static final String FIELDDATA_CACHE_CONCURRENCY_LEVEL = "indices.fielddata.cache.concurrency_level";
private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;
private final Cache<Key, Accountable> cache;
@ -73,8 +76,12 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
if (sizeInBytes > 0) {
cacheBuilder.maximumWeight(sizeInBytes).weigher(new FieldDataWeigher());
}
// defaults to 4, but this is a busy map for all indices, increase it a bit
cacheBuilder.concurrencyLevel(16);
// defaults to 4, but this is a busy map for all indices, increase it a bit by default
final int concurrencyLevel = settings.getAsInt(FIELDDATA_CACHE_CONCURRENCY_LEVEL, 16);
if (concurrencyLevel <= 0) {
throw new ElasticsearchIllegalArgumentException("concurrency_level must be > 0 but was: " + concurrencyLevel);
}
cacheBuilder.concurrencyLevel(concurrencyLevel);
if (expire != null && expire.millis() > 0) {
cacheBuilder.expireAfterAccess(expire.millis(), TimeUnit.MILLISECONDS);
}

View File

@ -96,7 +96,9 @@ import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.index.translog.fs.FsTranslogFile;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.node.internal.InternalNode;
@ -491,6 +493,12 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
if (random.nextBoolean()) {
builder.put(IndicesQueryCache.INDEX_CACHE_QUERY_ENABLED, random.nextBoolean());
}
if (random.nextBoolean()) {
builder.put(IndicesQueryCache.INDICES_CACHE_QUERY_CONCURRENCY_LEVEL, randomIntBetween(1, 32));
builder.put(IndicesFieldDataCache.FIELDDATA_CACHE_CONCURRENCY_LEVEL, randomIntBetween(1, 32));
builder.put(IndicesFilterCache.INDICES_CACHE_FILTER_CONCURRENCY_LEVEL, randomIntBetween(1, 32));
}
return builder;
}