From 0fc7b87070d2c7ea29fc2de03cfc5f77196e9aa2 Mon Sep 17 00:00:00 2001 From: Yonik Seeley Date: Sun, 23 Nov 2008 21:42:22 +0000 Subject: [PATCH] SOLR:667 generics, add cleanupThread, evictionListener to ConcurrentLRUCache git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@720051 13f79535-47bb-0310-9956-ffa450edef68 --- .../solr/common/util/ConcurrentLRUCache.java | 115 ++++++++++++------ .../org/apache/solr/search/FastLRUCache.java | 10 +- .../apache/solr/search/TestFastLRUCache.java | 2 +- 3 files changed, 87 insertions(+), 40 deletions(-) diff --git a/src/java/org/apache/solr/common/util/ConcurrentLRUCache.java b/src/java/org/apache/solr/common/util/ConcurrentLRUCache.java index 8d5201b0c19..53b8c2cfae2 100644 --- a/src/java/org/apache/solr/common/util/ConcurrentLRUCache.java +++ b/src/java/org/apache/solr/common/util/ConcurrentLRUCache.java @@ -9,6 +9,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import java.lang.ref.WeakReference; /** * A LRU cache implementation based upon ConcurrentHashMap and other techniques to reduce @@ -22,11 +23,10 @@ import java.util.concurrent.locks.ReentrantLock; * @version $Id$ * @since solr 1.4 */ -public class ConcurrentLRUCache { +public class ConcurrentLRUCache { private final ConcurrentHashMap map; private final int upperWaterMark, lowerWaterMark; - private volatile boolean stop = false; private final ReentrantLock markAndSweepLock = new ReentrantLock(true); private boolean isCleaning = false; // not volatile... piggybacked on other volatile vars private final boolean newThreadForCleanup; @@ -34,8 +34,12 @@ public class ConcurrentLRUCache { private final Stats stats = new Stats(); private final int acceptableWaterMark; private long oldestEntry = 0; // not volatile, only accessed in the cleaning method + private final EvictionListener evictionListener; + private CleanupThread cleanupThread ; - public ConcurrentLRUCache(int upperWaterMark, final int lowerWaterMark, int acceptableWatermark, int initialSize, boolean runCleanupThread, boolean runNewThreadForCleanup, final int delay) { + public ConcurrentLRUCache(int upperWaterMark, final int lowerWaterMark, int acceptableWatermark, + int initialSize, boolean runCleanupThread, boolean runNewThreadForCleanup, + EvictionListener evictionListener) { if (upperWaterMark < 1) throw new IllegalArgumentException("upperWaterMark must be > 0"); if (lowerWaterMark >= upperWaterMark) throw new IllegalArgumentException("lowerWaterMark must be < upperWaterMark"); @@ -44,27 +48,20 @@ public class ConcurrentLRUCache { this.upperWaterMark = upperWaterMark; this.lowerWaterMark = lowerWaterMark; this.acceptableWaterMark = acceptableWatermark; + this.evictionListener = evictionListener; if (runCleanupThread) { - new Thread() { - public void run() { - while (true) { - if (stop) break; - try { - Thread.sleep(delay * 1000); - } catch (InterruptedException e) {/*no op*/ } - markAndSweep(); - } - } - }.start(); + cleanupThread = new CleanupThread(this); + cleanupThread.start(); } } + public void setAlive(boolean live) { islive = live; } - public Object get(Object key) { - CacheEntry e = map.get(key); + public V get(K key) { + CacheEntry e = map.get(key); if (e == null) { if (islive) stats.missCounter.incrementAndGet(); return null; @@ -73,16 +70,17 @@ public class ConcurrentLRUCache { return e.value; } - public Object remove(Object key) { - CacheEntry cacheEntry = map.remove(key); + public V remove(K key) { + CacheEntry cacheEntry = map.remove(key); if (cacheEntry != null) { stats.size.decrementAndGet(); + if(evictionListener != null) evictionListener.evictedEntry(cacheEntry.key , cacheEntry.value); return cacheEntry.value; } return null; } - public Object put(Object key, Object val) { + public Object put(K key, V val) { if (val == null) return null; CacheEntry e = new CacheEntry(key, val, stats.accessCounter.incrementAndGet()); CacheEntry oldCacheEntry = map.put(key, e); @@ -112,6 +110,8 @@ public class ConcurrentLRUCache { markAndSweep(); } }.start(); + } else if (cleanupThread != null){ + cleanupThread.wakeThread(); } else { markAndSweep(); } @@ -157,13 +157,13 @@ public class ConcurrentLRUCache { int wantToKeep = lowerWaterMark; int wantToRemove = sz - lowerWaterMark; - CacheEntry[] eset = new CacheEntry[sz]; + CacheEntry[] eset = new CacheEntry[sz]; int eSize = 0; // System.out.println("newestEntry="+newestEntry + " oldestEntry="+oldestEntry); // System.out.println("items removed:" + numRemoved + " numKept=" + numKept + " esetSz="+ eSize + " sz-numRemoved=" + (sz-numRemoved)); - for (CacheEntry ce : map.values()) { + for (CacheEntry ce : map.values()) { // set lastAccessedCopy to avoid more volatile reads ce.lastAccessedCopy = ce.lastAccessed; long thisEntry = ce.lastAccessedCopy; @@ -209,7 +209,7 @@ public class ConcurrentLRUCache { // iterate backward to make it easy to remove items. for (int i=eSize-1; i>=0; i--) { - CacheEntry ce = eset[i]; + CacheEntry ce = eset[i]; long thisEntry = ce.lastAccessedCopy; if (thisEntry > newestEntry - wantToKeep) { @@ -258,7 +258,7 @@ public class ConcurrentLRUCache { PQueue queue = new PQueue(wantToRemove); for (int i=eSize-1; i>=0; i--) { - CacheEntry ce = eset[i]; + CacheEntry ce = eset[i]; long thisEntry = ce.lastAccessedCopy; if (thisEntry > newestEntry - wantToKeep) { @@ -308,7 +308,7 @@ public class ConcurrentLRUCache { // avoid using pop() since order doesn't matter anymore for (Object o : queue.getValues()) { if (o==null) continue; - CacheEntry ce = (CacheEntry)o; + CacheEntry ce = (CacheEntry)o; evictEntry(ce.key); numRemoved++; } @@ -355,18 +355,19 @@ public class ConcurrentLRUCache { } - private void evictEntry(Object key) { - Object o = map.remove(key); + private void evictEntry(K key) { + CacheEntry o = map.remove(key); if (o == null) return; stats.size.decrementAndGet(); stats.evictionCounter++; + if(evictionListener != null) evictionListener.evictedEntry(o.key,o.value); } public Map getLatestAccessedItems(long n) { // we need to grab the lock since we are changing lastAccessedCopy markAndSweepLock.lock(); - Map result = new LinkedHashMap(); + Map result = new LinkedHashMap(); TreeSet tree = new TreeSet(); try { for (Map.Entry entry : map.entrySet()) { @@ -384,7 +385,7 @@ public class ConcurrentLRUCache { } finally { markAndSweepLock.unlock(); } - for (CacheEntry e : tree) { + for (CacheEntry e : tree) { result.put(e.key, e.value); } return result; @@ -402,13 +403,14 @@ public class ConcurrentLRUCache { return map; } - private static class CacheEntry implements Comparable { - Object key, value; + private static class CacheEntry implements Comparable { + K key; + V value; volatile long lastAccessed = 0; long lastAccessedCopy = 0; - public CacheEntry(Object key, Object value, long lastAccessed) { + public CacheEntry(K key, V value, long lastAccessed) { this.key = key; this.value = value; this.lastAccessed = lastAccessed; @@ -438,17 +440,15 @@ public class ConcurrentLRUCache { public void destroy() { - stop = true; + if(cleanupThread != null){ + cleanupThread.stopThread(); + } } public Stats getStats() { return stats; } - protected void finalize() throws Throwable { - destroy(); - super.finalize(); - } public static class Stats { private final AtomicLong accessCounter = new AtomicLong(0), @@ -486,4 +486,47 @@ public class ConcurrentLRUCache { return missCounter.get(); } } + + public static interface EvictionListener{ + public void evictedEntry(K key, V value); + } + + private static class CleanupThread extends Thread { + private WeakReference cache; + + private boolean stop = false; + + public CleanupThread(ConcurrentLRUCache c) { + cache = new WeakReference(c); + } + + public void run() { + while (true) { + synchronized (this) { + if (stop) break; + try { + this.wait(); + } catch (InterruptedException e) {} + } + if (stop) break; + ConcurrentLRUCache c = cache.get(); + if(c == null) break; + c.markAndSweep(); + } + } + + void wakeThread() { + synchronized(this){ + this.notify(); + } + } + + void stopThread() { + synchronized(this){ + stop=true; + this.notify(); + } + } + } + } diff --git a/src/java/org/apache/solr/search/FastLRUCache.java b/src/java/org/apache/solr/search/FastLRUCache.java index 110c0a9e5a0..b458a15670a 100644 --- a/src/java/org/apache/solr/search/FastLRUCache.java +++ b/src/java/org/apache/solr/search/FastLRUCache.java @@ -62,22 +62,25 @@ public class FastLRUCache implements SolrCache { } else { acceptableLimit = Integer.parseInt(str); } - // acceptable limit should be somehwere between minLimit and limit + // acceptable limit should be somewhere between minLimit and limit acceptableLimit = Math.max(minLimit, acceptableLimit); str = (String) args.get("initialSize"); final int initialSize = str == null ? limit : Integer.parseInt(str); str = (String) args.get("autowarmCount"); autowarmCount = str == null ? 0 : Integer.parseInt(str); + str = (String) args.get("cleanupThread"); + boolean newThread = str == null ? false : Boolean.parseBoolean(str); - description = "Concurrent LRU Cache(maxSize=" + limit + ", initialSize=" + initialSize + ", minSize="+minLimit + ", acceptableSize="+acceptableLimit; + description = "Concurrent LRU Cache(maxSize=" + limit + ", initialSize=" + initialSize + + ", minSize="+minLimit + ", acceptableSize="+acceptableLimit+" ,cleanupThread ="+newThread; if (autowarmCount > 0) { description += ", autowarmCount=" + autowarmCount + ", regenerator=" + regenerator; } description += ')'; - cache = new ConcurrentLRUCache(limit, minLimit, acceptableLimit, initialSize, false, false, -1); + cache = new ConcurrentLRUCache(limit, minLimit, acceptableLimit, initialSize, newThread, false, null); cache.setAlive(false); if (persistence == null) { @@ -153,6 +156,7 @@ public class FastLRUCache implements SolrCache { public void close() { + cache.destroy(); } //////////////////////// SolrInfoMBeans methods ////////////////////// diff --git a/src/test/org/apache/solr/search/TestFastLRUCache.java b/src/test/org/apache/solr/search/TestFastLRUCache.java index f20c7b7e448..e4cb764d896 100644 --- a/src/test/org/apache/solr/search/TestFastLRUCache.java +++ b/src/test/org/apache/solr/search/TestFastLRUCache.java @@ -72,7 +72,7 @@ public class TestFastLRUCache extends TestCase { int upperWaterMark = (int)(lowerWaterMark * 1.1); Random r = new Random(0); - ConcurrentLRUCache cache = new ConcurrentLRUCache(upperWaterMark, lowerWaterMark, (upperWaterMark+lowerWaterMark)/2, upperWaterMark, false, false, 0); + ConcurrentLRUCache cache = new ConcurrentLRUCache(upperWaterMark, lowerWaterMark, (upperWaterMark+lowerWaterMark)/2, upperWaterMark, false, false, null); boolean getSize=false; int minSize=0,maxSize=0; for (int i=0; i