mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-25 14:26:27 +00:00
amortize size based eviction on a scheduled task and not on each operation
This commit is contained in:
parent
0f251bf6ea
commit
318bdb91c6
@ -24,15 +24,17 @@ import org.elasticsearch.common.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
|
|||||||
import org.elasticsearch.common.concurrentlinkedhashmap.EvictionListener;
|
import org.elasticsearch.common.concurrentlinkedhashmap.EvictionListener;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.lucene.docset.DocSet;
|
import org.elasticsearch.common.lucene.docset.DocSet;
|
||||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.index.cache.filter.support.AbstractWeightedFilterCache;
|
import org.elasticsearch.index.cache.filter.support.AbstractWeightedFilterCache;
|
||||||
import org.elasticsearch.index.cache.filter.support.FilterCacheValue;
|
import org.elasticsearch.index.cache.filter.support.FilterCacheValue;
|
||||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public class IndicesNodeFilterCache extends AbstractComponent implements EvictionListener<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>> {
|
public class IndicesNodeFilterCache extends AbstractComponent implements EvictionListener<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>> {
|
||||||
|
|
||||||
@ -43,11 +45,7 @@ public class IndicesNodeFilterCache extends AbstractComponent implements Evictio
|
|||||||
private final CopyOnWriteArrayList<EvictionListener<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>>> evictionListeners =
|
private final CopyOnWriteArrayList<EvictionListener<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>>> evictionListeners =
|
||||||
new CopyOnWriteArrayList<EvictionListener<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>>>();
|
new CopyOnWriteArrayList<EvictionListener<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>>>();
|
||||||
|
|
||||||
public IndicesNodeFilterCache() {
|
@Inject public IndicesNodeFilterCache(Settings settings, ThreadPool threadPool) {
|
||||||
this(ImmutableSettings.Builder.EMPTY_SETTINGS);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Inject public IndicesNodeFilterCache(Settings settings) {
|
|
||||||
super(settings);
|
super(settings);
|
||||||
|
|
||||||
String size = componentSettings.get("size", "20%");
|
String size = componentSettings.get("size", "20%");
|
||||||
@ -57,6 +55,7 @@ public class IndicesNodeFilterCache extends AbstractComponent implements Evictio
|
|||||||
} else {
|
} else {
|
||||||
sizeInBytes = ByteSizeValue.parseBytesSizeValue(size).bytes();
|
sizeInBytes = ByteSizeValue.parseBytesSizeValue(size).bytes();
|
||||||
}
|
}
|
||||||
|
TimeValue catchupTime = componentSettings.getAsTime("catchup", TimeValue.timeValueSeconds(10));
|
||||||
|
|
||||||
int weightedSize = (int) Math.min(sizeInBytes / AbstractWeightedFilterCache.FilterCacheValueWeigher.FACTOR, Integer.MAX_VALUE);
|
int weightedSize = (int) Math.min(sizeInBytes / AbstractWeightedFilterCache.FilterCacheValueWeigher.FACTOR, Integer.MAX_VALUE);
|
||||||
|
|
||||||
@ -64,6 +63,7 @@ public class IndicesNodeFilterCache extends AbstractComponent implements Evictio
|
|||||||
.maximumWeightedCapacity(weightedSize)
|
.maximumWeightedCapacity(weightedSize)
|
||||||
.weigher(new AbstractWeightedFilterCache.FilterCacheValueWeigher())
|
.weigher(new AbstractWeightedFilterCache.FilterCacheValueWeigher())
|
||||||
.listener(this)
|
.listener(this)
|
||||||
|
.catchup(threadPool.scheduler(), catchupTime.millis(), TimeUnit.MILLISECONDS)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
logger.debug("using [node] filter cache with size [{}]", new ByteSizeValue(sizeInBytes));
|
logger.debug("using [node] filter cache with size [{}]", new ByteSizeValue(sizeInBytes));
|
||||||
|
@ -112,6 +112,10 @@ public class ThreadPool extends AbstractComponent {
|
|||||||
return executor;
|
return executor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ScheduledExecutorService scheduler() {
|
||||||
|
return this.scheduler;
|
||||||
|
}
|
||||||
|
|
||||||
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval) {
|
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval) {
|
||||||
return scheduler.scheduleWithFixedDelay(new LoggingRunnable(command), interval.millis(), interval.millis(), TimeUnit.MILLISECONDS);
|
return scheduler.scheduleWithFixedDelay(new LoggingRunnable(command), interval.millis(), interval.millis(), TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user