diff --git a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java index 35e210f9535..8d038f5e40b 100644 --- a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java @@ -66,6 +66,7 @@ public class ClusterDynamicSettingsModule extends AbstractModule { clusterDynamicSettings.addDynamicSetting(FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP + "*"); clusterDynamicSettings.addDynamicSetting(IndicesFilterCache.INDICES_CACHE_FILTER_SIZE); clusterDynamicSettings.addDynamicSetting(IndicesFilterCache.INDICES_CACHE_FILTER_EXPIRE, Validator.TIME); + clusterDynamicSettings.addDynamicSetting(IndicesFilterCache.INDICES_CACHE_FILTER_CONCURRENCY_LEVEL, Validator.POSITIVE_INTEGER); clusterDynamicSettings.addDynamicSetting(IndicesStore.INDICES_STORE_THROTTLE_TYPE); clusterDynamicSettings.addDynamicSetting(IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE); clusterDynamicSettings.addDynamicSetting(IndicesTTLService.INDICES_TTL_INTERVAL, Validator.TIME); diff --git a/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java b/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java index d76ee7d629f..df3e9165650 100644 --- a/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java +++ b/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java @@ -26,6 +26,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import org.apache.lucene.search.DocIdSet; +import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -51,6 +52,7 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList private volatile String size; private volatile long sizeInBytes; private volatile TimeValue expire; + private volatile int concurrencyLevel; private final TimeValue cleanInterval; @@ -61,6 +63,7 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList public static final String INDICES_CACHE_FILTER_SIZE = "indices.cache.filter.size"; public static final String INDICES_CACHE_FILTER_EXPIRE = "indices.cache.filter.expire"; + public static final String INDICES_CACHE_FILTER_CONCURRENCY_LEVEL = "indices.cache.filter.concurrency_level"; class ApplySettings implements NodeSettingsService.Listener { @Override @@ -78,6 +81,15 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList IndicesFilterCache.this.expire = expire; replace = true; } + final int concurrencyLevel = settings.getAsInt(INDICES_CACHE_FILTER_CONCURRENCY_LEVEL, IndicesFilterCache.this.concurrencyLevel); + if (concurrencyLevel <= 0) { + throw new ElasticsearchIllegalArgumentException("concurrency_level must be > 0 but was: " + concurrencyLevel); + } + if (!Objects.equal(concurrencyLevel, IndicesFilterCache.this.concurrencyLevel)) { + logger.info("updating [indices.cache.filter.concurrency_level] from [{}] to [{}]", IndicesFilterCache.this.concurrencyLevel, concurrencyLevel); + IndicesFilterCache.this.concurrencyLevel = concurrencyLevel; + replace = true; + } if (replace) { Cache oldCache = IndicesFilterCache.this.cache; computeSizeInBytes(); @@ -94,6 +106,10 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList this.size = componentSettings.get("size", "10%"); this.expire = componentSettings.getAsTime("expire", null); this.cleanInterval = componentSettings.getAsTime("clean_interval", TimeValue.timeValueSeconds(60)); + this.concurrencyLevel = settings.getAsInt(INDICES_CACHE_FILTER_CONCURRENCY_LEVEL, 16); + if (concurrencyLevel <= 0) { + throw new ElasticsearchIllegalArgumentException("concurrency_level must be > 0 but was: " + concurrencyLevel); + } computeSizeInBytes(); buildCache(); logger.debug("using [node] weighted filter cache with size [{}], actual_size [{}], expire [{}], clean_interval [{}]", diff --git a/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java b/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java index 069fa19ef8c..dec054202f0 100644 --- a/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java +++ b/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java @@ -26,6 +26,7 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.search.SearchType; @@ -85,6 +86,7 @@ public class IndicesQueryCache extends AbstractComponent implements RemovalListe public static final String INDICES_CACHE_QUERY_SIZE = "indices.cache.query.size"; public static final String INDICES_CACHE_QUERY_EXPIRE = "indices.cache.query.expire"; + public static final String INDICES_CACHE_QUERY_CONCURRENCY_LEVEL = "indices.cache.query.concurrency_level"; private final ThreadPool threadPool; private final ClusterService clusterService; @@ -95,9 +97,12 @@ public class IndicesQueryCache extends AbstractComponent implements RemovalListe final ConcurrentMap registeredClosedListeners = ConcurrentCollections.newConcurrentMap(); final Set keysToClean = ConcurrentCollections.newConcurrentSet(); + //TODO make these changes configurable on the cluster level - private volatile String size; - private volatile TimeValue expire; + private final String size; + private final TimeValue expire; + private final int concurrencyLevel; + private volatile Cache cache; @Inject @@ -109,6 +114,11 @@ public class IndicesQueryCache extends AbstractComponent implements RemovalListe // this cache can be very small yet still be very effective this.size = settings.get(INDICES_CACHE_QUERY_SIZE, "1%"); this.expire = settings.getAsTime(INDICES_CACHE_QUERY_EXPIRE, null); + // defaults to 4, but this is a busy map for all indices, increase it a bit by default + this.concurrencyLevel = settings.getAsInt(INDICES_CACHE_QUERY_CONCURRENCY_LEVEL, 16); + if (concurrencyLevel <= 0) { + throw new ElasticsearchIllegalArgumentException("concurrency_level must be > 0 but was: " + concurrencyLevel); + } buildCache(); this.reaper = new Reaper(); @@ -120,9 +130,7 @@ public class IndicesQueryCache extends AbstractComponent implements RemovalListe CacheBuilder cacheBuilder = CacheBuilder.newBuilder() .maximumWeight(sizeInBytes).weigher(new QueryCacheWeigher()).removalListener(this); - - // defaults to 4, but this is a busy map for all indices, increase it a bit - cacheBuilder.concurrencyLevel(16); + cacheBuilder.concurrencyLevel(concurrencyLevel); if (expire != null) { cacheBuilder.expireAfterAccess(expire.millis(), TimeUnit.MILLISECONDS); diff --git a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java index 3267ff9936a..4e901019b07 100644 --- a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java +++ b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java @@ -24,6 +24,7 @@ import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.SegmentReader; import org.apache.lucene.util.Accountable; +import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; @@ -53,6 +54,8 @@ import java.util.concurrent.TimeUnit; public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener { public static final String FIELDDATA_CLEAN_INTERVAL_SETTING = "indices.fielddata.cache.cleanup_interval"; + public static final String FIELDDATA_CACHE_CONCURRENCY_LEVEL = "indices.fielddata.cache.concurrency_level"; + private final IndicesFieldDataCacheListener indicesFieldDataCacheListener; private final Cache cache; @@ -73,8 +76,12 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL if (sizeInBytes > 0) { cacheBuilder.maximumWeight(sizeInBytes).weigher(new FieldDataWeigher()); } - // defaults to 4, but this is a busy map for all indices, increase it a bit - cacheBuilder.concurrencyLevel(16); + // defaults to 4, but this is a busy map for all indices, increase it a bit by default + final int concurrencyLevel = settings.getAsInt(FIELDDATA_CACHE_CONCURRENCY_LEVEL, 16); + if (concurrencyLevel <= 0) { + throw new ElasticsearchIllegalArgumentException("concurrency_level must be > 0 but was: " + concurrencyLevel); + } + cacheBuilder.concurrencyLevel(concurrencyLevel); if (expire != null && expire.millis() > 0) { cacheBuilder.expireAfterAccess(expire.millis(), TimeUnit.MILLISECONDS); } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index def281a64d1..9ff42a2f880 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -96,7 +96,9 @@ import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.index.translog.fs.FsTranslog; import org.elasticsearch.index.translog.fs.FsTranslogFile; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.cache.filter.IndicesFilterCache; import org.elasticsearch.indices.cache.query.IndicesQueryCache; +import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.node.internal.InternalNode; @@ -491,6 +493,12 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase if (random.nextBoolean()) { builder.put(IndicesQueryCache.INDEX_CACHE_QUERY_ENABLED, random.nextBoolean()); } + + if (random.nextBoolean()) { + builder.put(IndicesQueryCache.INDICES_CACHE_QUERY_CONCURRENCY_LEVEL, randomIntBetween(1, 32)); + builder.put(IndicesFieldDataCache.FIELDDATA_CACHE_CONCURRENCY_LEVEL, randomIntBetween(1, 32)); + builder.put(IndicesFilterCache.INDICES_CACHE_FILTER_CONCURRENCY_LEVEL, randomIntBetween(1, 32)); + } return builder; }