Fixing up where I messed up on previous checkin for SOLR-2906

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1230790 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Erick Erickson 2012-01-12 21:18:50 +00:00
parent 05a65507af
commit 2492bd2790
4 changed files with 0 additions and 2635 deletions

View File

@ -288,592 +288,3 @@ public class LFUCache<K, V> implements SolrCache<K, V> {
return name + getStatistics().toString();
}
}
package org.apache.solr.search;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.SolrCore;
import org.apache.solr.util.ConcurrentLFUCache;
import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* SolrCache based on ConcurrentLFUCache implementation.
* <p/>
* This implementation does not use a separate cleanup thread. Instead it uses the calling thread
* itself to do the cleanup when the size of the cache exceeds certain limits.
* <p/>
* Also see <a href="http://wiki.apache.org/solr/SolrCaching">SolrCaching</a>
* <p/>
* <b>This API is experimental and subject to change</b>
*
* @version $Id: LFUCache.java 1170772 2011-09-14 19:09:56Z sarowe $
* @see org.apache.solr.util.ConcurrentLFUCache
* @see org.apache.solr.search.SolrCache
* @since solr 3.6
*/
public class LFUCache<K, V> implements SolrCache<K, V> {
// contains the statistics objects for all open caches of the same type
private List<ConcurrentLFUCache.Stats> statsList;
private long warmupTime = 0;
private String name;
private int autowarmCount;
private State state;
private CacheRegenerator regenerator;
private String description = "Concurrent LFU Cache";
private ConcurrentLFUCache<K, V> cache;
private int showItems = 0;
private Boolean timeDecay = true;
public Object init(Map args, Object persistence, CacheRegenerator regenerator) {
state = State.CREATED;
this.regenerator = regenerator;
name = (String) args.get("name");
String str = (String) args.get("size");
int limit = str == null ? 1024 : Integer.parseInt(str);
int minLimit;
str = (String) args.get("minSize");
if (str == null) {
minLimit = (int) (limit * 0.9);
} else {
minLimit = Integer.parseInt(str);
}
if (minLimit == 0) minLimit = 1;
if (limit <= minLimit) limit = minLimit + 1;
int acceptableSize;
str = (String) args.get("acceptableSize");
if (str == null) {
acceptableSize = (int) (limit * 0.95);
} else {
acceptableSize = Integer.parseInt(str);
}
// acceptable limit should be somewhere between minLimit and limit
acceptableSize = Math.max(minLimit, acceptableSize);
str = (String) args.get("initialSize");
final int initialSize = str == null ? limit : Integer.parseInt(str);
str = (String) args.get("autowarmCount");
autowarmCount = str == null ? 0 : Integer.parseInt(str);
str = (String) args.get("cleanupThread");
boolean newThread = str == null ? false : Boolean.parseBoolean(str);
str = (String) args.get("showItems");
showItems = str == null ? 0 : Integer.parseInt(str);
// Don't make this "efficient" by removing the test, default is true and omitting the param will make it false.
str = (String) args.get("timeDecay");
timeDecay = (str == null) ? true : Boolean.parseBoolean(str);
description = "Concurrent LFU Cache(maxSize=" + limit + ", initialSize=" + initialSize +
", minSize=" + minLimit + ", acceptableSize=" + acceptableSize + ", cleanupThread=" + newThread +
", timeDecay=" + Boolean.toString(timeDecay);
if (autowarmCount > 0) {
description += ", autowarmCount=" + autowarmCount + ", regenerator=" + regenerator;
}
description += ')';
cache = new ConcurrentLFUCache<K, V>(limit, minLimit, acceptableSize, initialSize, newThread, false, null, timeDecay);
cache.setAlive(false);
statsList = (List<ConcurrentLFUCache.Stats>) persistence;
if (statsList == null) {
// must be the first time a cache of this type is being created
// Use a CopyOnWriteArrayList since puts are very rare and iteration may be a frequent operation
// because it is used in getStatistics()
statsList = new CopyOnWriteArrayList<ConcurrentLFUCache.Stats>();
// the first entry will be for cumulative stats of caches that have been closed.
statsList.add(new ConcurrentLFUCache.Stats());
}
statsList.add(cache.getStats());
return statsList;
}
public String name() {
return name;
}
public int size() {
return cache.size();
}
public V put(K key, V value) {
return cache.put(key, value);
}
public V get(K key) {
return cache.get(key);
}
public void clear() {
cache.clear();
}
public void setState(State state) {
this.state = state;
cache.setAlive(state == State.LIVE);
}
public State getState() {
return state;
}
public void warm(SolrIndexSearcher searcher, SolrCache old) throws IOException {
if (regenerator == null) return;
long warmingStartTime = System.currentTimeMillis();
LFUCache other = (LFUCache) old;
// warm entries
if (autowarmCount != 0) {
int sz = other.size();
if (autowarmCount != -1) sz = Math.min(sz, autowarmCount);
Map items = other.cache.getMostUsedItems(sz);
Map.Entry[] itemsArr = new Map.Entry[items.size()];
int counter = 0;
for (Object mapEntry : items.entrySet()) {
itemsArr[counter++] = (Map.Entry) mapEntry;
}
for (int i = itemsArr.length - 1; i >= 0; i--) {
try {
boolean continueRegen = regenerator.regenerateItem(searcher,
this, old, itemsArr[i].getKey(), itemsArr[i].getValue());
if (!continueRegen) break;
} catch (Throwable e) {
SolrException.log(log, "Error during auto-warming of key:" + itemsArr[i].getKey(), e);
}
}
}
warmupTime = System.currentTimeMillis() - warmingStartTime;
}
public void close() {
// add the stats to the cumulative stats object (the first in the statsList)
statsList.get(0).add(cache.getStats());
statsList.remove(cache.getStats());
cache.destroy();
}
//////////////////////// SolrInfoMBeans methods //////////////////////
public String getName() {
return LFUCache.class.getName();
}
public String getVersion() {
return SolrCore.version;
}
public String getDescription() {
return description;
}
public Category getCategory() {
return Category.CACHE;
}
public String getSourceId() {
return "$Id: LFUCache.java 1170772 2011-09-14 19:09:56Z sarowe $";
}
public String getSource() {
return "$URL: http://svn.apache.org/repos/asf/lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/search/LFUCache.java $";
}
public URL[] getDocs() {
return null;
}
// returns a ratio, not a percent.
private static String calcHitRatio(long lookups, long hits) {
if (lookups == 0) return "0.00";
if (lookups == hits) return "1.00";
int hundredths = (int) (hits * 100 / lookups); // rounded down
if (hundredths < 10) return "0.0" + hundredths;
return "0." + hundredths;
}
public NamedList getStatistics() {
NamedList<Serializable> lst = new SimpleOrderedMap<Serializable>();
if (cache == null) return lst;
ConcurrentLFUCache.Stats stats = cache.getStats();
long lookups = stats.getCumulativeLookups();
long hits = stats.getCumulativeHits();
long inserts = stats.getCumulativePuts();
long evictions = stats.getCumulativeEvictions();
long size = stats.getCurrentSize();
lst.add("lookups", lookups);
lst.add("hits", hits);
lst.add("hitratio", calcHitRatio(lookups, hits));
lst.add("inserts", inserts);
lst.add("evictions", evictions);
lst.add("size", size);
lst.add("warmupTime", warmupTime);
lst.add("timeDecay", timeDecay);
long clookups = 0;
long chits = 0;
long cinserts = 0;
long cevictions = 0;
// NOTE: It is safe to iterate on a CopyOnWriteArrayList
for (ConcurrentLFUCache.Stats statistiscs : statsList) {
clookups += statistiscs.getCumulativeLookups();
chits += statistiscs.getCumulativeHits();
cinserts += statistiscs.getCumulativePuts();
cevictions += statistiscs.getCumulativeEvictions();
}
lst.add("cumulative_lookups", clookups);
lst.add("cumulative_hits", chits);
lst.add("cumulative_hitratio", calcHitRatio(clookups, chits));
lst.add("cumulative_inserts", cinserts);
lst.add("cumulative_evictions", cevictions);
if (showItems != 0) {
Map items = cache.getMostUsedItems(showItems == -1 ? Integer.MAX_VALUE : showItems);
for (Map.Entry e : (Set<Map.Entry>) items.entrySet()) {
Object k = e.getKey();
Object v = e.getValue();
String ks = "item_" + k;
String vs = v.toString();
lst.add(ks, vs);
}
}
return lst;
}
@Override
public String toString() {
return name + getStatistics().toString();
}
}
package org.apache.solr.search;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.SolrCore;
import org.apache.solr.util.ConcurrentLFUCache;
import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* SolrCache based on ConcurrentLFUCache implementation.
* <p/>
* This implementation does not use a separate cleanup thread. Instead it uses the calling thread
* itself to do the cleanup when the size of the cache exceeds certain limits.
* <p/>
* Also see <a href="http://wiki.apache.org/solr/SolrCaching">SolrCaching</a>
* <p/>
* <b>This API is experimental and subject to change</b>
*
* @version $Id: LFUCache.java 1170772 2011-09-14 19:09:56Z sarowe $
* @see org.apache.solr.util.ConcurrentLFUCache
* @see org.apache.solr.search.SolrCache
* @since solr 3.6
*/
public class LFUCache<K, V> implements SolrCache<K, V> {
// contains the statistics objects for all open caches of the same type
private List<ConcurrentLFUCache.Stats> statsList;
private long warmupTime = 0;
private String name;
private int autowarmCount;
private State state;
private CacheRegenerator regenerator;
private String description = "Concurrent LFU Cache";
private ConcurrentLFUCache<K, V> cache;
private int showItems = 0;
private Boolean timeDecay = true;
public Object init(Map args, Object persistence, CacheRegenerator regenerator) {
state = State.CREATED;
this.regenerator = regenerator;
name = (String) args.get("name");
String str = (String) args.get("size");
int limit = str == null ? 1024 : Integer.parseInt(str);
int minLimit;
str = (String) args.get("minSize");
if (str == null) {
minLimit = (int) (limit * 0.9);
} else {
minLimit = Integer.parseInt(str);
}
if (minLimit == 0) minLimit = 1;
if (limit <= minLimit) limit = minLimit + 1;
int acceptableSize;
str = (String) args.get("acceptableSize");
if (str == null) {
acceptableSize = (int) (limit * 0.95);
} else {
acceptableSize = Integer.parseInt(str);
}
// acceptable limit should be somewhere between minLimit and limit
acceptableSize = Math.max(minLimit, acceptableSize);
str = (String) args.get("initialSize");
final int initialSize = str == null ? limit : Integer.parseInt(str);
str = (String) args.get("autowarmCount");
autowarmCount = str == null ? 0 : Integer.parseInt(str);
str = (String) args.get("cleanupThread");
boolean newThread = str == null ? false : Boolean.parseBoolean(str);
str = (String) args.get("showItems");
showItems = str == null ? 0 : Integer.parseInt(str);
// Don't make this "efficient" by removing the test, default is true and omitting the param will make it false.
str = (String) args.get("timeDecay");
timeDecay = (str == null) ? true : Boolean.parseBoolean(str);
description = "Concurrent LFU Cache(maxSize=" + limit + ", initialSize=" + initialSize +
", minSize=" + minLimit + ", acceptableSize=" + acceptableSize + ", cleanupThread=" + newThread +
", timeDecay=" + Boolean.toString(timeDecay);
if (autowarmCount > 0) {
description += ", autowarmCount=" + autowarmCount + ", regenerator=" + regenerator;
}
description += ')';
cache = new ConcurrentLFUCache<K, V>(limit, minLimit, acceptableSize, initialSize, newThread, false, null, timeDecay);
cache.setAlive(false);
statsList = (List<ConcurrentLFUCache.Stats>) persistence;
if (statsList == null) {
// must be the first time a cache of this type is being created
// Use a CopyOnWriteArrayList since puts are very rare and iteration may be a frequent operation
// because it is used in getStatistics()
statsList = new CopyOnWriteArrayList<ConcurrentLFUCache.Stats>();
// the first entry will be for cumulative stats of caches that have been closed.
statsList.add(new ConcurrentLFUCache.Stats());
}
statsList.add(cache.getStats());
return statsList;
}
public String name() {
return name;
}
public int size() {
return cache.size();
}
public V put(K key, V value) {
return cache.put(key, value);
}
public V get(K key) {
return cache.get(key);
}
public void clear() {
cache.clear();
}
public void setState(State state) {
this.state = state;
cache.setAlive(state == State.LIVE);
}
public State getState() {
return state;
}
public void warm(SolrIndexSearcher searcher, SolrCache old) throws IOException {
if (regenerator == null) return;
long warmingStartTime = System.currentTimeMillis();
LFUCache other = (LFUCache) old;
// warm entries
if (autowarmCount != 0) {
int sz = other.size();
if (autowarmCount != -1) sz = Math.min(sz, autowarmCount);
Map items = other.cache.getMostUsedItems(sz);
Map.Entry[] itemsArr = new Map.Entry[items.size()];
int counter = 0;
for (Object mapEntry : items.entrySet()) {
itemsArr[counter++] = (Map.Entry) mapEntry;
}
for (int i = itemsArr.length - 1; i >= 0; i--) {
try {
boolean continueRegen = regenerator.regenerateItem(searcher,
this, old, itemsArr[i].getKey(), itemsArr[i].getValue());
if (!continueRegen) break;
} catch (Throwable e) {
SolrException.log(log, "Error during auto-warming of key:" + itemsArr[i].getKey(), e);
}
}
}
warmupTime = System.currentTimeMillis() - warmingStartTime;
}
public void close() {
// add the stats to the cumulative stats object (the first in the statsList)
statsList.get(0).add(cache.getStats());
statsList.remove(cache.getStats());
cache.destroy();
}
//////////////////////// SolrInfoMBeans methods //////////////////////
public String getName() {
return LFUCache.class.getName();
}
public String getVersion() {
return SolrCore.version;
}
public String getDescription() {
return description;
}
public Category getCategory() {
return Category.CACHE;
}
public String getSourceId() {
return "$Id: LFUCache.java 1170772 2011-09-14 19:09:56Z sarowe $";
}
public String getSource() {
return "$URL: http://svn.apache.org/repos/asf/lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/search/LFUCache.java $";
}
public URL[] getDocs() {
return null;
}
// returns a ratio, not a percent.
private static String calcHitRatio(long lookups, long hits) {
if (lookups == 0) return "0.00";
if (lookups == hits) return "1.00";
int hundredths = (int) (hits * 100 / lookups); // rounded down
if (hundredths < 10) return "0.0" + hundredths;
return "0." + hundredths;
}
public NamedList getStatistics() {
NamedList<Serializable> lst = new SimpleOrderedMap<Serializable>();
if (cache == null) return lst;
ConcurrentLFUCache.Stats stats = cache.getStats();
long lookups = stats.getCumulativeLookups();
long hits = stats.getCumulativeHits();
long inserts = stats.getCumulativePuts();
long evictions = stats.getCumulativeEvictions();
long size = stats.getCurrentSize();
lst.add("lookups", lookups);
lst.add("hits", hits);
lst.add("hitratio", calcHitRatio(lookups, hits));
lst.add("inserts", inserts);
lst.add("evictions", evictions);
lst.add("size", size);
lst.add("warmupTime", warmupTime);
lst.add("timeDecay", timeDecay);
long clookups = 0;
long chits = 0;
long cinserts = 0;
long cevictions = 0;
// NOTE: It is safe to iterate on a CopyOnWriteArrayList
for (ConcurrentLFUCache.Stats statistiscs : statsList) {
clookups += statistiscs.getCumulativeLookups();
chits += statistiscs.getCumulativeHits();
cinserts += statistiscs.getCumulativePuts();
cevictions += statistiscs.getCumulativeEvictions();
}
lst.add("cumulative_lookups", clookups);
lst.add("cumulative_hits", chits);
lst.add("cumulative_hitratio", calcHitRatio(clookups, chits));
lst.add("cumulative_inserts", cinserts);
lst.add("cumulative_evictions", cevictions);
if (showItems != 0) {
Map items = cache.getMostUsedItems(showItems == -1 ? Integer.MAX_VALUE : showItems);
for (Map.Entry e : (Set<Map.Entry>) items.entrySet()) {
Object k = e.getKey();
Object v = e.getValue();
String ks = "item_" + k;
String vs = v.toString();
lst.add(ks, vs);
}
}
return lst;
}
@Override
public String toString() {
return name + getStatistics().toString();
}
}

View File

@ -473,953 +473,3 @@ public class ConcurrentLFUCache<K, V> {
}
}
}
package org.apache.solr.util;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.ref.WeakReference;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
/**
* A LFU cache implementation based upon ConcurrentHashMap.
* <p/>
* This is not a terribly efficient implementation. The tricks used in the
* LRU version were not directly usable, perhaps it might be possible to
* rewrite them with LFU in mind.
* <p/>
* <b>This API is experimental and subject to change</b>
*
* @version $Id: ConcurrentLFUCache.java 1170772 2011-09-14 19:09:56Z sarowe $
* @since solr 1.6
*/
public class ConcurrentLFUCache<K, V> {
private static Logger log = LoggerFactory.getLogger(ConcurrentLFUCache.class);
private final ConcurrentHashMap<Object, CacheEntry<K, V>> map;
private final int upperWaterMark, lowerWaterMark;
private final ReentrantLock markAndSweepLock = new ReentrantLock(true);
private boolean isCleaning = false; // not volatile... piggybacked on other volatile vars
private final boolean newThreadForCleanup;
private volatile boolean islive = true;
private final Stats stats = new Stats();
private final int acceptableWaterMark;
private long lowHitCount = 0; // not volatile, only accessed in the cleaning method
private final EvictionListener<K, V> evictionListener;
private CleanupThread cleanupThread;
private final boolean timeDecay;
public ConcurrentLFUCache(int upperWaterMark, final int lowerWaterMark, int acceptableSize,
int initialSize, boolean runCleanupThread, boolean runNewThreadForCleanup,
EvictionListener<K, V> evictionListener, boolean timeDecay) {
if (upperWaterMark < 1) throw new IllegalArgumentException("upperWaterMark must be > 0");
if (lowerWaterMark >= upperWaterMark)
throw new IllegalArgumentException("lowerWaterMark must be < upperWaterMark");
map = new ConcurrentHashMap<Object, CacheEntry<K, V>>(initialSize);
newThreadForCleanup = runNewThreadForCleanup;
this.upperWaterMark = upperWaterMark;
this.lowerWaterMark = lowerWaterMark;
this.acceptableWaterMark = acceptableSize;
this.evictionListener = evictionListener;
this.timeDecay = timeDecay;
if (runCleanupThread) {
cleanupThread = new CleanupThread(this);
cleanupThread.start();
}
}
public ConcurrentLFUCache(int size, int lowerWatermark) {
this(size, lowerWatermark, (int) Math.floor((lowerWatermark + size) / 2),
(int) Math.ceil(0.75 * size), false, false, null, true);
}
public void setAlive(boolean live) {
islive = live;
}
public V get(K key) {
CacheEntry<K, V> e = map.get(key);
if (e == null) {
if (islive) stats.missCounter.incrementAndGet();
return null;
}
if (islive) {
e.lastAccessed = stats.accessCounter.incrementAndGet();
e.hits.incrementAndGet();
}
return e.value;
}
public V remove(K key) {
CacheEntry<K, V> cacheEntry = map.remove(key);
if (cacheEntry != null) {
stats.size.decrementAndGet();
return cacheEntry.value;
}
return null;
}
public V put(K key, V val) {
if (val == null) return null;
CacheEntry<K, V> e = new CacheEntry<K, V>(key, val, stats.accessCounter.incrementAndGet());
CacheEntry<K, V> oldCacheEntry = map.put(key, e);
int currentSize;
if (oldCacheEntry == null) {
currentSize = stats.size.incrementAndGet();
} else {
currentSize = stats.size.get();
}
if (islive) {
stats.putCounter.incrementAndGet();
} else {
stats.nonLivePutCounter.incrementAndGet();
}
// Check if we need to clear out old entries from the cache.
// isCleaning variable is checked instead of markAndSweepLock.isLocked()
// for performance because every put invokation will check until
// the size is back to an acceptable level.
//
// There is a race between the check and the call to markAndSweep, but
// it's unimportant because markAndSweep actually aquires the lock or returns if it can't.
//
// Thread safety note: isCleaning read is piggybacked (comes after) other volatile reads
// in this method.
if (currentSize > upperWaterMark && !isCleaning) {
if (newThreadForCleanup) {
new Thread() {
@Override
public void run() {
markAndSweep();
}
}.start();
} else if (cleanupThread != null) {
cleanupThread.wakeThread();
} else {
markAndSweep();
}
}
return oldCacheEntry == null ? null : oldCacheEntry.value;
}
/**
* Removes items from the cache to bring the size down
* to an acceptable value ('acceptableWaterMark').
* <p/>
* It is done in two stages. In the first stage, least recently used items are evicted.
* If, after the first stage, the cache size is still greater than 'acceptableSize'
* config parameter, the second stage takes over.
* <p/>
* The second stage is more intensive and tries to bring down the cache size
* to the 'lowerWaterMark' config parameter.
*/
private void markAndSweep() {
if (!markAndSweepLock.tryLock()) return;
try {
long lowHitCount = this.lowHitCount;
isCleaning = true;
this.lowHitCount = lowHitCount; // volatile write to make isCleaning visible
int sz = stats.size.get();
int wantToRemove = sz - lowerWaterMark;
TreeSet<CacheEntry> tree = new TreeSet<CacheEntry>();
for (CacheEntry<K, V> ce : map.values()) {
// set hitsCopy to avoid later Atomic reads
ce.hitsCopy = ce.hits.get();
ce.lastAccessedCopy = ce.lastAccessed;
if (timeDecay) {
ce.hits.set(ce.hitsCopy >>> 1);
}
if (tree.size() < wantToRemove) {
tree.add(ce);
} else {
// If the hits are not equal, we can remove before adding
// which is slightly faster
if (ce.hitsCopy < tree.first().hitsCopy) {
tree.remove(tree.first());
tree.add(ce);
} else if (ce.hitsCopy == tree.first().hitsCopy) {
tree.add(ce);
tree.remove(tree.first());
}
}
}
for (CacheEntry<K, V> e : tree) {
evictEntry(e.key);
}
} finally {
isCleaning = false; // set before markAndSweep.unlock() for visibility
markAndSweepLock.unlock();
}
}
private void evictEntry(K key) {
CacheEntry<K, V> o = map.remove(key);
if (o == null) return;
stats.size.decrementAndGet();
stats.evictionCounter.incrementAndGet();
if (evictionListener != null) evictionListener.evictedEntry(o.key, o.value);
}
/**
* Returns 'n' number of least used entries present in this cache.
* <p/>
* This uses a TreeSet to collect the 'n' least used items ordered by ascending hitcount
* and returns a LinkedHashMap containing 'n' or less than 'n' entries.
*
* @param n the number of items needed
* @return a LinkedHashMap containing 'n' or less than 'n' entries
*/
public Map<K, V> getLeastUsedItems(int n) {
Map<K, V> result = new LinkedHashMap<K, V>();
if (n <= 0)
return result;
TreeSet<CacheEntry> tree = new TreeSet<CacheEntry>();
// we need to grab the lock since we are changing the copy variables
markAndSweepLock.lock();
try {
for (Map.Entry<Object, CacheEntry<K, V>> entry : map.entrySet()) {
CacheEntry ce = entry.getValue();
ce.hitsCopy = ce.hits.get();
ce.lastAccessedCopy = ce.lastAccessed;
if (tree.size() < n) {
tree.add(ce);
} else {
// If the hits are not equal, we can remove before adding
// which is slightly faster
if (ce.hitsCopy < tree.first().hitsCopy) {
tree.remove(tree.first());
tree.add(ce);
} else if (ce.hitsCopy == tree.first().hitsCopy) {
tree.add(ce);
tree.remove(tree.first());
}
}
}
} finally {
markAndSweepLock.unlock();
}
for (CacheEntry<K, V> e : tree) {
result.put(e.key, e.value);
}
return result;
}
/**
* Returns 'n' number of most used entries present in this cache.
* <p/>
* This uses a TreeSet to collect the 'n' most used items ordered by descending hitcount
* and returns a LinkedHashMap containing 'n' or less than 'n' entries.
*
* @param n the number of items needed
* @return a LinkedHashMap containing 'n' or less than 'n' entries
*/
public Map<K, V> getMostUsedItems(int n) {
Map<K, V> result = new LinkedHashMap<K, V>();
if (n <= 0)
return result;
TreeSet<CacheEntry> tree = new TreeSet<CacheEntry>();
// we need to grab the lock since we are changing the copy variables
markAndSweepLock.lock();
try {
for (Map.Entry<Object, CacheEntry<K, V>> entry : map.entrySet()) {
CacheEntry<K, V> ce = entry.getValue();
ce.hitsCopy = ce.hits.get();
ce.lastAccessedCopy = ce.lastAccessed;
if (tree.size() < n) {
tree.add(ce);
} else {
// If the hits are not equal, we can remove before adding
// which is slightly faster
if (ce.hitsCopy > tree.last().hitsCopy) {
tree.remove(tree.last());
tree.add(ce);
} else if (ce.hitsCopy == tree.last().hitsCopy) {
tree.add(ce);
tree.remove(tree.last());
}
}
}
} finally {
markAndSweepLock.unlock();
}
for (CacheEntry<K, V> e : tree) {
result.put(e.key, e.value);
}
return result;
}
public int size() {
return stats.size.get();
}
public void clear() {
map.clear();
}
public Map<Object, CacheEntry<K, V>> getMap() {
return map;
}
private static class CacheEntry<K, V> implements Comparable<CacheEntry<K, V>> {
K key;
V value;
volatile AtomicLong hits = new AtomicLong(0);
long hitsCopy = 0;
volatile long lastAccessed = 0;
long lastAccessedCopy = 0;
public CacheEntry(K key, V value, long lastAccessed) {
this.key = key;
this.value = value;
this.lastAccessed = lastAccessed;
}
public int compareTo(CacheEntry<K, V> that) {
if (this.hitsCopy == that.hitsCopy) {
if (this.lastAccessedCopy == that.lastAccessedCopy) {
return 0;
}
return this.lastAccessedCopy < that.lastAccessedCopy ? 1 : -1;
}
return this.hitsCopy < that.hitsCopy ? 1 : -1;
}
@Override
public int hashCode() {
return value.hashCode();
}
@Override
public boolean equals(Object obj) {
return value.equals(obj);
}
@Override
public String toString() {
return "key: " + key + " value: " + value + " hits:" + hits.get();
}
}
private boolean isDestroyed = false;
public void destroy() {
try {
if (cleanupThread != null) {
cleanupThread.stopThread();
}
} finally {
isDestroyed = true;
}
}
public Stats getStats() {
return stats;
}
public static class Stats {
private final AtomicLong accessCounter = new AtomicLong(0),
putCounter = new AtomicLong(0),
nonLivePutCounter = new AtomicLong(0),
missCounter = new AtomicLong();
private final AtomicInteger size = new AtomicInteger();
private AtomicLong evictionCounter = new AtomicLong();
public long getCumulativeLookups() {
return (accessCounter.get() - putCounter.get() - nonLivePutCounter.get()) + missCounter.get();
}
public long getCumulativeHits() {
return accessCounter.get() - putCounter.get() - nonLivePutCounter.get();
}
public long getCumulativePuts() {
return putCounter.get();
}
public long getCumulativeEvictions() {
return evictionCounter.get();
}
public int getCurrentSize() {
return size.get();
}
public long getCumulativeNonLivePuts() {
return nonLivePutCounter.get();
}
public long getCumulativeMisses() {
return missCounter.get();
}
public void add(Stats other) {
accessCounter.addAndGet(other.accessCounter.get());
putCounter.addAndGet(other.putCounter.get());
nonLivePutCounter.addAndGet(other.nonLivePutCounter.get());
missCounter.addAndGet(other.missCounter.get());
evictionCounter.addAndGet(other.evictionCounter.get());
size.set(Math.max(size.get(), other.size.get()));
}
}
public static interface EvictionListener<K, V> {
public void evictedEntry(K key, V value);
}
private static class CleanupThread extends Thread {
private WeakReference<ConcurrentLFUCache> cache;
private boolean stop = false;
public CleanupThread(ConcurrentLFUCache c) {
cache = new WeakReference<ConcurrentLFUCache>(c);
}
@Override
public void run() {
while (true) {
synchronized (this) {
if (stop) break;
try {
this.wait();
} catch (InterruptedException e) {
}
}
if (stop) break;
ConcurrentLFUCache c = cache.get();
if (c == null) break;
c.markAndSweep();
}
}
void wakeThread() {
synchronized (this) {
this.notify();
}
}
void stopThread() {
synchronized (this) {
stop = true;
this.notify();
}
}
}
@Override
protected void finalize() throws Throwable {
try {
if (!isDestroyed) {
log.error("ConcurrentLFUCache was not destroyed prior to finalize(), indicates a bug -- POSSIBLE RESOURCE LEAK!!!");
destroy();
}
} finally {
super.finalize();
}
}
}
package org.apache.solr.util;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.ref.WeakReference;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
/**
* A LFU cache implementation based upon ConcurrentHashMap.
* <p/>
* This is not a terribly efficient implementation. The tricks used in the
* LRU version were not directly usable, perhaps it might be possible to
* rewrite them with LFU in mind.
* <p/>
* <b>This API is experimental and subject to change</b>
*
* @version $Id: ConcurrentLFUCache.java 1170772 2011-09-14 19:09:56Z sarowe $
* @since solr 1.6
*/
public class ConcurrentLFUCache<K, V> {
private static Logger log = LoggerFactory.getLogger(ConcurrentLFUCache.class);
private final ConcurrentHashMap<Object, CacheEntry<K, V>> map;
private final int upperWaterMark, lowerWaterMark;
private final ReentrantLock markAndSweepLock = new ReentrantLock(true);
private boolean isCleaning = false; // not volatile... piggybacked on other volatile vars
private final boolean newThreadForCleanup;
private volatile boolean islive = true;
private final Stats stats = new Stats();
private final int acceptableWaterMark;
private long lowHitCount = 0; // not volatile, only accessed in the cleaning method
private final EvictionListener<K, V> evictionListener;
private CleanupThread cleanupThread;
private final boolean timeDecay;
public ConcurrentLFUCache(int upperWaterMark, final int lowerWaterMark, int acceptableSize,
int initialSize, boolean runCleanupThread, boolean runNewThreadForCleanup,
EvictionListener<K, V> evictionListener, boolean timeDecay) {
if (upperWaterMark < 1) throw new IllegalArgumentException("upperWaterMark must be > 0");
if (lowerWaterMark >= upperWaterMark)
throw new IllegalArgumentException("lowerWaterMark must be < upperWaterMark");
map = new ConcurrentHashMap<Object, CacheEntry<K, V>>(initialSize);
newThreadForCleanup = runNewThreadForCleanup;
this.upperWaterMark = upperWaterMark;
this.lowerWaterMark = lowerWaterMark;
this.acceptableWaterMark = acceptableSize;
this.evictionListener = evictionListener;
this.timeDecay = timeDecay;
if (runCleanupThread) {
cleanupThread = new CleanupThread(this);
cleanupThread.start();
}
}
public ConcurrentLFUCache(int size, int lowerWatermark) {
this(size, lowerWatermark, (int) Math.floor((lowerWatermark + size) / 2),
(int) Math.ceil(0.75 * size), false, false, null, true);
}
public void setAlive(boolean live) {
islive = live;
}
public V get(K key) {
CacheEntry<K, V> e = map.get(key);
if (e == null) {
if (islive) stats.missCounter.incrementAndGet();
return null;
}
if (islive) {
e.lastAccessed = stats.accessCounter.incrementAndGet();
e.hits.incrementAndGet();
}
return e.value;
}
public V remove(K key) {
CacheEntry<K, V> cacheEntry = map.remove(key);
if (cacheEntry != null) {
stats.size.decrementAndGet();
return cacheEntry.value;
}
return null;
}
public V put(K key, V val) {
if (val == null) return null;
CacheEntry<K, V> e = new CacheEntry<K, V>(key, val, stats.accessCounter.incrementAndGet());
CacheEntry<K, V> oldCacheEntry = map.put(key, e);
int currentSize;
if (oldCacheEntry == null) {
currentSize = stats.size.incrementAndGet();
} else {
currentSize = stats.size.get();
}
if (islive) {
stats.putCounter.incrementAndGet();
} else {
stats.nonLivePutCounter.incrementAndGet();
}
// Check if we need to clear out old entries from the cache.
// isCleaning variable is checked instead of markAndSweepLock.isLocked()
// for performance because every put invokation will check until
// the size is back to an acceptable level.
//
// There is a race between the check and the call to markAndSweep, but
// it's unimportant because markAndSweep actually aquires the lock or returns if it can't.
//
// Thread safety note: isCleaning read is piggybacked (comes after) other volatile reads
// in this method.
if (currentSize > upperWaterMark && !isCleaning) {
if (newThreadForCleanup) {
new Thread() {
@Override
public void run() {
markAndSweep();
}
}.start();
} else if (cleanupThread != null) {
cleanupThread.wakeThread();
} else {
markAndSweep();
}
}
return oldCacheEntry == null ? null : oldCacheEntry.value;
}
/**
* Removes items from the cache to bring the size down
* to an acceptable value ('acceptableWaterMark').
* <p/>
* It is done in two stages. In the first stage, least recently used items are evicted.
* If, after the first stage, the cache size is still greater than 'acceptableSize'
* config parameter, the second stage takes over.
* <p/>
* The second stage is more intensive and tries to bring down the cache size
* to the 'lowerWaterMark' config parameter.
*/
private void markAndSweep() {
if (!markAndSweepLock.tryLock()) return;
try {
long lowHitCount = this.lowHitCount;
isCleaning = true;
this.lowHitCount = lowHitCount; // volatile write to make isCleaning visible
int sz = stats.size.get();
int wantToRemove = sz - lowerWaterMark;
TreeSet<CacheEntry> tree = new TreeSet<CacheEntry>();
for (CacheEntry<K, V> ce : map.values()) {
// set hitsCopy to avoid later Atomic reads
ce.hitsCopy = ce.hits.get();
ce.lastAccessedCopy = ce.lastAccessed;
if (timeDecay) {
ce.hits.set(ce.hitsCopy >>> 1);
}
if (tree.size() < wantToRemove) {
tree.add(ce);
} else {
// If the hits are not equal, we can remove before adding
// which is slightly faster
if (ce.hitsCopy < tree.first().hitsCopy) {
tree.remove(tree.first());
tree.add(ce);
} else if (ce.hitsCopy == tree.first().hitsCopy) {
tree.add(ce);
tree.remove(tree.first());
}
}
}
for (CacheEntry<K, V> e : tree) {
evictEntry(e.key);
}
} finally {
isCleaning = false; // set before markAndSweep.unlock() for visibility
markAndSweepLock.unlock();
}
}
private void evictEntry(K key) {
CacheEntry<K, V> o = map.remove(key);
if (o == null) return;
stats.size.decrementAndGet();
stats.evictionCounter.incrementAndGet();
if (evictionListener != null) evictionListener.evictedEntry(o.key, o.value);
}
/**
* Returns 'n' number of least used entries present in this cache.
* <p/>
* This uses a TreeSet to collect the 'n' least used items ordered by ascending hitcount
* and returns a LinkedHashMap containing 'n' or less than 'n' entries.
*
* @param n the number of items needed
* @return a LinkedHashMap containing 'n' or less than 'n' entries
*/
public Map<K, V> getLeastUsedItems(int n) {
Map<K, V> result = new LinkedHashMap<K, V>();
if (n <= 0)
return result;
TreeSet<CacheEntry> tree = new TreeSet<CacheEntry>();
// we need to grab the lock since we are changing the copy variables
markAndSweepLock.lock();
try {
for (Map.Entry<Object, CacheEntry<K, V>> entry : map.entrySet()) {
CacheEntry ce = entry.getValue();
ce.hitsCopy = ce.hits.get();
ce.lastAccessedCopy = ce.lastAccessed;
if (tree.size() < n) {
tree.add(ce);
} else {
// If the hits are not equal, we can remove before adding
// which is slightly faster
if (ce.hitsCopy < tree.first().hitsCopy) {
tree.remove(tree.first());
tree.add(ce);
} else if (ce.hitsCopy == tree.first().hitsCopy) {
tree.add(ce);
tree.remove(tree.first());
}
}
}
} finally {
markAndSweepLock.unlock();
}
for (CacheEntry<K, V> e : tree) {
result.put(e.key, e.value);
}
return result;
}
/**
* Returns 'n' number of most used entries present in this cache.
* <p/>
* This uses a TreeSet to collect the 'n' most used items ordered by descending hitcount
* and returns a LinkedHashMap containing 'n' or less than 'n' entries.
*
* @param n the number of items needed
* @return a LinkedHashMap containing 'n' or less than 'n' entries
*/
public Map<K, V> getMostUsedItems(int n) {
Map<K, V> result = new LinkedHashMap<K, V>();
if (n <= 0)
return result;
TreeSet<CacheEntry> tree = new TreeSet<CacheEntry>();
// we need to grab the lock since we are changing the copy variables
markAndSweepLock.lock();
try {
for (Map.Entry<Object, CacheEntry<K, V>> entry : map.entrySet()) {
CacheEntry<K, V> ce = entry.getValue();
ce.hitsCopy = ce.hits.get();
ce.lastAccessedCopy = ce.lastAccessed;
if (tree.size() < n) {
tree.add(ce);
} else {
// If the hits are not equal, we can remove before adding
// which is slightly faster
if (ce.hitsCopy > tree.last().hitsCopy) {
tree.remove(tree.last());
tree.add(ce);
} else if (ce.hitsCopy == tree.last().hitsCopy) {
tree.add(ce);
tree.remove(tree.last());
}
}
}
} finally {
markAndSweepLock.unlock();
}
for (CacheEntry<K, V> e : tree) {
result.put(e.key, e.value);
}
return result;
}
public int size() {
return stats.size.get();
}
public void clear() {
map.clear();
}
public Map<Object, CacheEntry<K, V>> getMap() {
return map;
}
private static class CacheEntry<K, V> implements Comparable<CacheEntry<K, V>> {
K key;
V value;
volatile AtomicLong hits = new AtomicLong(0);
long hitsCopy = 0;
volatile long lastAccessed = 0;
long lastAccessedCopy = 0;
public CacheEntry(K key, V value, long lastAccessed) {
this.key = key;
this.value = value;
this.lastAccessed = lastAccessed;
}
public int compareTo(CacheEntry<K, V> that) {
if (this.hitsCopy == that.hitsCopy) {
if (this.lastAccessedCopy == that.lastAccessedCopy) {
return 0;
}
return this.lastAccessedCopy < that.lastAccessedCopy ? 1 : -1;
}
return this.hitsCopy < that.hitsCopy ? 1 : -1;
}
@Override
public int hashCode() {
return value.hashCode();
}
@Override
public boolean equals(Object obj) {
return value.equals(obj);
}
@Override
public String toString() {
return "key: " + key + " value: " + value + " hits:" + hits.get();
}
}
private boolean isDestroyed = false;
public void destroy() {
try {
if (cleanupThread != null) {
cleanupThread.stopThread();
}
} finally {
isDestroyed = true;
}
}
public Stats getStats() {
return stats;
}
public static class Stats {
private final AtomicLong accessCounter = new AtomicLong(0),
putCounter = new AtomicLong(0),
nonLivePutCounter = new AtomicLong(0),
missCounter = new AtomicLong();
private final AtomicInteger size = new AtomicInteger();
private AtomicLong evictionCounter = new AtomicLong();
public long getCumulativeLookups() {
return (accessCounter.get() - putCounter.get() - nonLivePutCounter.get()) + missCounter.get();
}
public long getCumulativeHits() {
return accessCounter.get() - putCounter.get() - nonLivePutCounter.get();
}
public long getCumulativePuts() {
return putCounter.get();
}
public long getCumulativeEvictions() {
return evictionCounter.get();
}
public int getCurrentSize() {
return size.get();
}
public long getCumulativeNonLivePuts() {
return nonLivePutCounter.get();
}
public long getCumulativeMisses() {
return missCounter.get();
}
public void add(Stats other) {
accessCounter.addAndGet(other.accessCounter.get());
putCounter.addAndGet(other.putCounter.get());
nonLivePutCounter.addAndGet(other.nonLivePutCounter.get());
missCounter.addAndGet(other.missCounter.get());
evictionCounter.addAndGet(other.evictionCounter.get());
size.set(Math.max(size.get(), other.size.get()));
}
}
public static interface EvictionListener<K, V> {
public void evictedEntry(K key, V value);
}
private static class CleanupThread extends Thread {
private WeakReference<ConcurrentLFUCache> cache;
private boolean stop = false;
public CleanupThread(ConcurrentLFUCache c) {
cache = new WeakReference<ConcurrentLFUCache>(c);
}
@Override
public void run() {
while (true) {
synchronized (this) {
if (stop) break;
try {
this.wait();
} catch (InterruptedException e) {
}
}
if (stop) break;
ConcurrentLFUCache c = cache.get();
if (c == null) break;
c.markAndSweep();
}
}
void wakeThread() {
synchronized (this) {
this.notify();
}
}
void stopThread() {
synchronized (this) {
stop = true;
this.notify();
}
}
}
@Override
protected void finalize() throws Throwable {
try {
if (!isDestroyed) {
log.error("ConcurrentLFUCache was not destroyed prior to finalize(), indicates a bug -- POSSIBLE RESOURCE LEAK!!!");
destroy();
}
} finally {
super.finalize();
}
}
}

View File

@ -1,77 +1,3 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<config>
<luceneMatchVersion>${tests.luceneMatchVersion:LUCENE_CURRENT}</luceneMatchVersion>
<query>
<cache name="lfuCacheDecayFalse"
class="solr.search.LFUCache"
size="10"
initialSize="9"
timeDecay="false" />
<cache name="lfuCacheDecayTrue"
class="solr.search.LFUCache"
size="10"
initialSize="9"
timeDecay="true" />
<cache name="lfuCacheDecayDefault"
class="solr.search.LFUCache"
size="10"
initialSize="9" />
</query>
</config>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<config>
<luceneMatchVersion>${tests.luceneMatchVersion:LUCENE_CURRENT}</luceneMatchVersion>
<query>
<cache name="lfuCacheDecayFalse"
class="solr.search.LFUCache"
size="10"
initialSize="9"
timeDecay="false" />
<cache name="lfuCacheDecayTrue"
class="solr.search.LFUCache"
size="10"
initialSize="9"
timeDecay="true" />
<cache name="lfuCacheDecayDefault"
class="solr.search.LFUCache"
size="10"
initialSize="9" />
</query>
</config>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with

File diff suppressed because it is too large Load Diff