parent
d9ff42f88a
commit
eb37a5992b
|
@ -26,10 +26,8 @@ import com.google.common.cache.CacheBuilder;
|
|||
import com.google.common.cache.RemovalListener;
|
||||
import com.google.common.cache.RemovalNotification;
|
||||
import org.apache.lucene.search.DocIdSet;
|
||||
import org.elasticsearch.cache.recycler.CacheRecycler;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.recycler.Recycler;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.MemorySizeValue;
|
||||
|
@ -47,7 +45,6 @@ import java.util.concurrent.TimeUnit;
|
|||
public class IndicesFilterCache extends AbstractComponent implements RemovalListener<WeightedFilterCache.FilterCacheKey, DocIdSet> {
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
private final CacheRecycler cacheRecycler;
|
||||
|
||||
private Cache<WeightedFilterCache.FilterCacheKey, DocIdSet> cache;
|
||||
|
||||
|
@ -91,10 +88,9 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList
|
|||
}
|
||||
|
||||
@Inject
|
||||
public IndicesFilterCache(Settings settings, ThreadPool threadPool, CacheRecycler cacheRecycler, NodeSettingsService nodeSettingsService) {
|
||||
public IndicesFilterCache(Settings settings, ThreadPool threadPool, NodeSettingsService nodeSettingsService) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.cacheRecycler = cacheRecycler;
|
||||
this.size = componentSettings.get("size", "10%");
|
||||
this.expire = componentSettings.getAsTime("expire", null);
|
||||
this.cleanInterval = componentSettings.getAsTime("clean_interval", TimeValue.timeValueSeconds(60));
|
||||
|
@ -167,6 +163,10 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList
|
|||
*/
|
||||
class ReaderCleaner implements Runnable {
|
||||
|
||||
// this is thread safe since we only schedule the next cleanup once the current one is
|
||||
// done, so no concurrent execution
|
||||
private final ObjectOpenHashSet<Object> keys = ObjectOpenHashSet.newInstance();
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (closed) {
|
||||
|
@ -180,33 +180,30 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList
|
|||
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
Recycler.V<ObjectOpenHashSet<Object>> keys = cacheRecycler.hashSet(-1);
|
||||
try {
|
||||
for (Iterator<Object> it = readersKeysToClean.iterator(); it.hasNext(); ) {
|
||||
keys.v().add(it.next());
|
||||
it.remove();
|
||||
}
|
||||
cache.cleanUp();
|
||||
if (!keys.v().isEmpty()) {
|
||||
for (Iterator<WeightedFilterCache.FilterCacheKey> it = cache.asMap().keySet().iterator(); it.hasNext(); ) {
|
||||
WeightedFilterCache.FilterCacheKey filterCacheKey = it.next();
|
||||
if (keys.v().contains(filterCacheKey.readerKey())) {
|
||||
// same as invalidate
|
||||
it.remove();
|
||||
}
|
||||
keys.clear();
|
||||
for (Iterator<Object> it = readersKeysToClean.iterator(); it.hasNext(); ) {
|
||||
keys.add(it.next());
|
||||
it.remove();
|
||||
}
|
||||
cache.cleanUp();
|
||||
if (!keys.isEmpty()) {
|
||||
for (Iterator<WeightedFilterCache.FilterCacheKey> it = cache.asMap().keySet().iterator(); it.hasNext(); ) {
|
||||
WeightedFilterCache.FilterCacheKey filterCacheKey = it.next();
|
||||
if (keys.contains(filterCacheKey.readerKey())) {
|
||||
// same as invalidate
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
schedule();
|
||||
} finally {
|
||||
keys.close();
|
||||
}
|
||||
schedule();
|
||||
keys.clear();
|
||||
}
|
||||
});
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
logger.debug("Can not run ReaderCleaner - execution rejected", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void schedule() {
|
||||
try {
|
||||
threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, this);
|
||||
|
|
Loading…
Reference in New Issue