SOLR:667 generics, add cleanupThread, evictionListener to ConcurrentLRUCache

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@720051 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2008-11-23 21:42:22 +00:00
parent 741fe7a24d
commit 0fc7b87070
3 changed files with 87 additions and 40 deletions

View File

@ -9,6 +9,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.lang.ref.WeakReference;
/** /**
* A LRU cache implementation based upon ConcurrentHashMap and other techniques to reduce * A LRU cache implementation based upon ConcurrentHashMap and other techniques to reduce
@ -22,11 +23,10 @@ import java.util.concurrent.locks.ReentrantLock;
* @version $Id$ * @version $Id$
* @since solr 1.4 * @since solr 1.4
*/ */
public class ConcurrentLRUCache { public class ConcurrentLRUCache<K,V> {
private final ConcurrentHashMap<Object, CacheEntry> map; private final ConcurrentHashMap<Object, CacheEntry> map;
private final int upperWaterMark, lowerWaterMark; private final int upperWaterMark, lowerWaterMark;
private volatile boolean stop = false;
private final ReentrantLock markAndSweepLock = new ReentrantLock(true); private final ReentrantLock markAndSweepLock = new ReentrantLock(true);
private boolean isCleaning = false; // not volatile... piggybacked on other volatile vars private boolean isCleaning = false; // not volatile... piggybacked on other volatile vars
private final boolean newThreadForCleanup; private final boolean newThreadForCleanup;
@ -34,8 +34,12 @@ public class ConcurrentLRUCache {
private final Stats stats = new Stats(); private final Stats stats = new Stats();
private final int acceptableWaterMark; private final int acceptableWaterMark;
private long oldestEntry = 0; // not volatile, only accessed in the cleaning method private long oldestEntry = 0; // not volatile, only accessed in the cleaning method
private final EvictionListener<K,V> evictionListener;
private CleanupThread cleanupThread ;
public ConcurrentLRUCache(int upperWaterMark, final int lowerWaterMark, int acceptableWatermark, int initialSize, boolean runCleanupThread, boolean runNewThreadForCleanup, final int delay) { public ConcurrentLRUCache(int upperWaterMark, final int lowerWaterMark, int acceptableWatermark,
int initialSize, boolean runCleanupThread, boolean runNewThreadForCleanup,
EvictionListener<K,V> evictionListener) {
if (upperWaterMark < 1) throw new IllegalArgumentException("upperWaterMark must be > 0"); if (upperWaterMark < 1) throw new IllegalArgumentException("upperWaterMark must be > 0");
if (lowerWaterMark >= upperWaterMark) if (lowerWaterMark >= upperWaterMark)
throw new IllegalArgumentException("lowerWaterMark must be < upperWaterMark"); throw new IllegalArgumentException("lowerWaterMark must be < upperWaterMark");
@ -44,27 +48,20 @@ public class ConcurrentLRUCache {
this.upperWaterMark = upperWaterMark; this.upperWaterMark = upperWaterMark;
this.lowerWaterMark = lowerWaterMark; this.lowerWaterMark = lowerWaterMark;
this.acceptableWaterMark = acceptableWatermark; this.acceptableWaterMark = acceptableWatermark;
this.evictionListener = evictionListener;
if (runCleanupThread) { if (runCleanupThread) {
new Thread() { cleanupThread = new CleanupThread(this);
public void run() { cleanupThread.start();
while (true) {
if (stop) break;
try {
Thread.sleep(delay * 1000);
} catch (InterruptedException e) {/*no op*/ }
markAndSweep();
}
}
}.start();
} }
} }
public void setAlive(boolean live) { public void setAlive(boolean live) {
islive = live; islive = live;
} }
public Object get(Object key) { public V get(K key) {
CacheEntry e = map.get(key); CacheEntry<K,V> e = map.get(key);
if (e == null) { if (e == null) {
if (islive) stats.missCounter.incrementAndGet(); if (islive) stats.missCounter.incrementAndGet();
return null; return null;
@ -73,16 +70,17 @@ public class ConcurrentLRUCache {
return e.value; return e.value;
} }
public Object remove(Object key) { public V remove(K key) {
CacheEntry cacheEntry = map.remove(key); CacheEntry<K,V> cacheEntry = map.remove(key);
if (cacheEntry != null) { if (cacheEntry != null) {
stats.size.decrementAndGet(); stats.size.decrementAndGet();
if(evictionListener != null) evictionListener.evictedEntry(cacheEntry.key , cacheEntry.value);
return cacheEntry.value; return cacheEntry.value;
} }
return null; return null;
} }
public Object put(Object key, Object val) { public Object put(K key, V val) {
if (val == null) return null; if (val == null) return null;
CacheEntry e = new CacheEntry(key, val, stats.accessCounter.incrementAndGet()); CacheEntry e = new CacheEntry(key, val, stats.accessCounter.incrementAndGet());
CacheEntry oldCacheEntry = map.put(key, e); CacheEntry oldCacheEntry = map.put(key, e);
@ -112,6 +110,8 @@ public class ConcurrentLRUCache {
markAndSweep(); markAndSweep();
} }
}.start(); }.start();
} else if (cleanupThread != null){
cleanupThread.wakeThread();
} else { } else {
markAndSweep(); markAndSweep();
} }
@ -157,13 +157,13 @@ public class ConcurrentLRUCache {
int wantToKeep = lowerWaterMark; int wantToKeep = lowerWaterMark;
int wantToRemove = sz - lowerWaterMark; int wantToRemove = sz - lowerWaterMark;
CacheEntry[] eset = new CacheEntry[sz]; CacheEntry<K,V>[] eset = new CacheEntry[sz];
int eSize = 0; int eSize = 0;
// System.out.println("newestEntry="+newestEntry + " oldestEntry="+oldestEntry); // System.out.println("newestEntry="+newestEntry + " oldestEntry="+oldestEntry);
// System.out.println("items removed:" + numRemoved + " numKept=" + numKept + " esetSz="+ eSize + " sz-numRemoved=" + (sz-numRemoved)); // System.out.println("items removed:" + numRemoved + " numKept=" + numKept + " esetSz="+ eSize + " sz-numRemoved=" + (sz-numRemoved));
for (CacheEntry ce : map.values()) { for (CacheEntry<K,V> ce : map.values()) {
// set lastAccessedCopy to avoid more volatile reads // set lastAccessedCopy to avoid more volatile reads
ce.lastAccessedCopy = ce.lastAccessed; ce.lastAccessedCopy = ce.lastAccessed;
long thisEntry = ce.lastAccessedCopy; long thisEntry = ce.lastAccessedCopy;
@ -209,7 +209,7 @@ public class ConcurrentLRUCache {
// iterate backward to make it easy to remove items. // iterate backward to make it easy to remove items.
for (int i=eSize-1; i>=0; i--) { for (int i=eSize-1; i>=0; i--) {
CacheEntry ce = eset[i]; CacheEntry<K,V> ce = eset[i];
long thisEntry = ce.lastAccessedCopy; long thisEntry = ce.lastAccessedCopy;
if (thisEntry > newestEntry - wantToKeep) { if (thisEntry > newestEntry - wantToKeep) {
@ -258,7 +258,7 @@ public class ConcurrentLRUCache {
PQueue queue = new PQueue(wantToRemove); PQueue queue = new PQueue(wantToRemove);
for (int i=eSize-1; i>=0; i--) { for (int i=eSize-1; i>=0; i--) {
CacheEntry ce = eset[i]; CacheEntry<K,V> ce = eset[i];
long thisEntry = ce.lastAccessedCopy; long thisEntry = ce.lastAccessedCopy;
if (thisEntry > newestEntry - wantToKeep) { if (thisEntry > newestEntry - wantToKeep) {
@ -308,7 +308,7 @@ public class ConcurrentLRUCache {
// avoid using pop() since order doesn't matter anymore // avoid using pop() since order doesn't matter anymore
for (Object o : queue.getValues()) { for (Object o : queue.getValues()) {
if (o==null) continue; if (o==null) continue;
CacheEntry ce = (CacheEntry)o; CacheEntry<K,V> ce = (CacheEntry)o;
evictEntry(ce.key); evictEntry(ce.key);
numRemoved++; numRemoved++;
} }
@ -355,18 +355,19 @@ public class ConcurrentLRUCache {
} }
private void evictEntry(Object key) { private void evictEntry(K key) {
Object o = map.remove(key); CacheEntry<K,V> o = map.remove(key);
if (o == null) return; if (o == null) return;
stats.size.decrementAndGet(); stats.size.decrementAndGet();
stats.evictionCounter++; stats.evictionCounter++;
if(evictionListener != null) evictionListener.evictedEntry(o.key,o.value);
} }
public Map getLatestAccessedItems(long n) { public Map getLatestAccessedItems(long n) {
// we need to grab the lock since we are changing lastAccessedCopy // we need to grab the lock since we are changing lastAccessedCopy
markAndSweepLock.lock(); markAndSweepLock.lock();
Map result = new LinkedHashMap(); Map<K,V> result = new LinkedHashMap<K,V>();
TreeSet<CacheEntry> tree = new TreeSet<CacheEntry>(); TreeSet<CacheEntry> tree = new TreeSet<CacheEntry>();
try { try {
for (Map.Entry<Object, CacheEntry> entry : map.entrySet()) { for (Map.Entry<Object, CacheEntry> entry : map.entrySet()) {
@ -384,7 +385,7 @@ public class ConcurrentLRUCache {
} finally { } finally {
markAndSweepLock.unlock(); markAndSweepLock.unlock();
} }
for (CacheEntry e : tree) { for (CacheEntry<K,V> e : tree) {
result.put(e.key, e.value); result.put(e.key, e.value);
} }
return result; return result;
@ -402,13 +403,14 @@ public class ConcurrentLRUCache {
return map; return map;
} }
private static class CacheEntry implements Comparable<CacheEntry> { private static class CacheEntry<K,V> implements Comparable<CacheEntry> {
Object key, value; K key;
V value;
volatile long lastAccessed = 0; volatile long lastAccessed = 0;
long lastAccessedCopy = 0; long lastAccessedCopy = 0;
public CacheEntry(Object key, Object value, long lastAccessed) { public CacheEntry(K key, V value, long lastAccessed) {
this.key = key; this.key = key;
this.value = value; this.value = value;
this.lastAccessed = lastAccessed; this.lastAccessed = lastAccessed;
@ -438,17 +440,15 @@ public class ConcurrentLRUCache {
public void destroy() { public void destroy() {
stop = true; if(cleanupThread != null){
cleanupThread.stopThread();
}
} }
public Stats getStats() { public Stats getStats() {
return stats; return stats;
} }
protected void finalize() throws Throwable {
destroy();
super.finalize();
}
public static class Stats { public static class Stats {
private final AtomicLong accessCounter = new AtomicLong(0), private final AtomicLong accessCounter = new AtomicLong(0),
@ -486,4 +486,47 @@ public class ConcurrentLRUCache {
return missCounter.get(); return missCounter.get();
} }
} }
public static interface EvictionListener<K,V>{
public void evictedEntry(K key, V value);
}
private static class CleanupThread extends Thread {
private WeakReference<ConcurrentLRUCache> cache;
private boolean stop = false;
public CleanupThread(ConcurrentLRUCache c) {
cache = new WeakReference<ConcurrentLRUCache>(c);
}
public void run() {
while (true) {
synchronized (this) {
if (stop) break;
try {
this.wait();
} catch (InterruptedException e) {}
}
if (stop) break;
ConcurrentLRUCache c = cache.get();
if(c == null) break;
c.markAndSweep();
}
}
void wakeThread() {
synchronized(this){
this.notify();
}
}
void stopThread() {
synchronized(this){
stop=true;
this.notify();
}
}
}
} }

View File

@ -62,22 +62,25 @@ public class FastLRUCache implements SolrCache {
} else { } else {
acceptableLimit = Integer.parseInt(str); acceptableLimit = Integer.parseInt(str);
} }
// acceptable limit should be somehwere between minLimit and limit // acceptable limit should be somewhere between minLimit and limit
acceptableLimit = Math.max(minLimit, acceptableLimit); acceptableLimit = Math.max(minLimit, acceptableLimit);
str = (String) args.get("initialSize"); str = (String) args.get("initialSize");
final int initialSize = str == null ? limit : Integer.parseInt(str); final int initialSize = str == null ? limit : Integer.parseInt(str);
str = (String) args.get("autowarmCount"); str = (String) args.get("autowarmCount");
autowarmCount = str == null ? 0 : Integer.parseInt(str); autowarmCount = str == null ? 0 : Integer.parseInt(str);
str = (String) args.get("cleanupThread");
boolean newThread = str == null ? false : Boolean.parseBoolean(str);
description = "Concurrent LRU Cache(maxSize=" + limit + ", initialSize=" + initialSize + ", minSize="+minLimit + ", acceptableSize="+acceptableLimit; description = "Concurrent LRU Cache(maxSize=" + limit + ", initialSize=" + initialSize +
", minSize="+minLimit + ", acceptableSize="+acceptableLimit+" ,cleanupThread ="+newThread;
if (autowarmCount > 0) { if (autowarmCount > 0) {
description += ", autowarmCount=" + autowarmCount description += ", autowarmCount=" + autowarmCount
+ ", regenerator=" + regenerator; + ", regenerator=" + regenerator;
} }
description += ')'; description += ')';
cache = new ConcurrentLRUCache(limit, minLimit, acceptableLimit, initialSize, false, false, -1); cache = new ConcurrentLRUCache(limit, minLimit, acceptableLimit, initialSize, newThread, false, null);
cache.setAlive(false); cache.setAlive(false);
if (persistence == null) { if (persistence == null) {
@ -153,6 +156,7 @@ public class FastLRUCache implements SolrCache {
public void close() { public void close() {
cache.destroy();
} }
//////////////////////// SolrInfoMBeans methods ////////////////////// //////////////////////// SolrInfoMBeans methods //////////////////////

View File

@ -72,7 +72,7 @@ public class TestFastLRUCache extends TestCase {
int upperWaterMark = (int)(lowerWaterMark * 1.1); int upperWaterMark = (int)(lowerWaterMark * 1.1);
Random r = new Random(0); Random r = new Random(0);
ConcurrentLRUCache cache = new ConcurrentLRUCache(upperWaterMark, lowerWaterMark, (upperWaterMark+lowerWaterMark)/2, upperWaterMark, false, false, 0); ConcurrentLRUCache cache = new ConcurrentLRUCache(upperWaterMark, lowerWaterMark, (upperWaterMark+lowerWaterMark)/2, upperWaterMark, false, false, null);
boolean getSize=false; boolean getSize=false;
int minSize=0,maxSize=0; int minSize=0,maxSize=0;
for (int i=0; i<iter; i++) { for (int i=0; i<iter; i++) {