allow to change indices.cache.filter.size setting through cluster update settings api
This commit is contained in:
parent
6e81fbc30d
commit
ef47308ead
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.index.cache.filter.support.AbstractWeightedFilterCache;
|
||||
import org.elasticsearch.index.cache.filter.support.FilterCacheValue;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
@ -38,23 +39,28 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
public class IndicesNodeFilterCache extends AbstractComponent implements EvictionListener<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>> {
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private ConcurrentMap<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>> cache;
|
||||
|
||||
private volatile String size;
|
||||
private volatile long sizeInBytes;
|
||||
|
||||
private final CopyOnWriteArrayList<EvictionListener<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>>> evictionListeners =
|
||||
new CopyOnWriteArrayList<EvictionListener<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>>>();
|
||||
|
||||
@Inject public IndicesNodeFilterCache(Settings settings, ThreadPool threadPool) {
|
||||
@Inject public IndicesNodeFilterCache(Settings settings, ThreadPool threadPool, NodeSettingsService nodeSettingsService) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.size = componentSettings.get("size", "20%");
|
||||
computeSizeInBytes();
|
||||
buildCache();
|
||||
logger.debug("using [node] filter cache with size [{}], actual_size [{}]", size, new ByteSizeValue(sizeInBytes));
|
||||
|
||||
String size = componentSettings.get("size", "20%");
|
||||
if (size.endsWith("%")) {
|
||||
double percent = Double.parseDouble(size.substring(0, size.length() - 1));
|
||||
sizeInBytes = (long) ((percent / 100) * JvmInfo.jvmInfo().getMem().getHeapMax().bytes());
|
||||
} else {
|
||||
sizeInBytes = ByteSizeValue.parseBytesSizeValue(size).bytes();
|
||||
}
|
||||
nodeSettingsService.addListener(new ApplySettings());
|
||||
}
|
||||
|
||||
private void buildCache() {
|
||||
TimeValue catchupTime = componentSettings.getAsTime("catchup", TimeValue.timeValueSeconds(10));
|
||||
|
||||
int weightedSize = (int) Math.min(sizeInBytes / AbstractWeightedFilterCache.FilterCacheValueWeigher.FACTOR, Integer.MAX_VALUE);
|
||||
|
@ -63,10 +69,17 @@ public class IndicesNodeFilterCache extends AbstractComponent implements Evictio
|
|||
.maximumWeightedCapacity(weightedSize)
|
||||
.weigher(new AbstractWeightedFilterCache.FilterCacheValueWeigher())
|
||||
.listener(this)
|
||||
.catchup(threadPool.scheduler(), catchupTime.millis(), TimeUnit.MILLISECONDS)
|
||||
.catchup(this.threadPool.scheduler(), catchupTime.millis(), TimeUnit.MILLISECONDS)
|
||||
.build();
|
||||
}
|
||||
|
||||
logger.debug("using [node] filter cache with size [{}]", new ByteSizeValue(sizeInBytes));
|
||||
private void computeSizeInBytes() {
|
||||
if (size.endsWith("%")) {
|
||||
double percent = Double.parseDouble(size.substring(0, size.length() - 1));
|
||||
sizeInBytes = (long) ((percent / 100) * JvmInfo.jvmInfo().getMem().getHeapMax().bytes());
|
||||
} else {
|
||||
sizeInBytes = ByteSizeValue.parseBytesSizeValue(size).bytes();
|
||||
}
|
||||
}
|
||||
|
||||
public void addEvictionListener(EvictionListener<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>> listener) {
|
||||
|
@ -90,4 +103,18 @@ public class IndicesNodeFilterCache extends AbstractComponent implements Evictio
|
|||
listener.onEviction(filterCacheKey, docSetFilterCacheValue);
|
||||
}
|
||||
}
|
||||
|
||||
class ApplySettings implements NodeSettingsService.Listener {
|
||||
@Override public void onRefreshSettings(Settings settings) {
|
||||
String size = settings.get("indices.cache.filter.size", IndicesNodeFilterCache.this.size);
|
||||
if (!size.equals(IndicesNodeFilterCache.this.size)) {
|
||||
logger.info("updating [indices.cache.filter.size] from [{}] to [{}]", IndicesNodeFilterCache.this.size, size);
|
||||
IndicesNodeFilterCache.this.size = size;
|
||||
ConcurrentMap<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>> oldCache = IndicesNodeFilterCache.this.cache;
|
||||
computeSizeInBytes();
|
||||
buildCache();
|
||||
oldCache.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue