SOLR-667 -- A LRU cache implementation based upon ConcurrentHashMap and other techniques to reduce contention and synchronization overhead, to utilize multiple CPU cores more effectively.

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@708656 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2008-10-28 20:13:49 +00:00
parent 5a7f0c6cd9
commit 6125bd15e6
4 changed files with 594 additions and 0 deletions

View File

@ -73,6 +73,10 @@ New Features
12. SOLR-795: SpellCheckComponent supports building indices on optimize if configured in solrconfig.xml
(Jason Rennie, shalin)
13. SOLR-667: A LRU cache implementation based upon ConcurrentHashMap and other techniques to reduce
contention and synchronization overhead, to utilize multiple CPU cores more effectively.
(Fuad Efendi, Noble Paul, yonik via shalin)
Optimizations
----------------------
1. SOLR-374: Use IndexReader.reopen to save resources by re-using parts of the

View File

@ -0,0 +1,293 @@
package org.apache.solr.common.util;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
/**
* A LRU cache implementation based upon ConcurrentHashMap and other techniques to reduce
* contention and synchronization overhead to utilize multiple CPU cores more effectively.
*
* Note that the implementation does not follow a true LRU (least-recently-used) eviction
* strategy. Instead it strives to
*
* @version $Id$
* @since solr 1.4
*/
public class ConcurrentLRUCache {
private Map<Object, CacheEntry> map;
private final int upperWaterMark, lowerWaterMark;
private boolean stop = false;
private final ReentrantLock markAndSweepLock = new ReentrantLock(true);
private final boolean newThreadForCleanup;
private volatile boolean islive = true;
private final Stats stats = new Stats();
private final int acceptableWaterMark;
public ConcurrentLRUCache(int upperWaterMark, final int lowerWaterMark, int acceptableWatermark, int initialSize, boolean runCleanupThread, boolean runNewThreadForCleanup, final int delay) {
if (upperWaterMark < 1) throw new IllegalArgumentException("upperWaterMark must be > 0");
if (lowerWaterMark >= upperWaterMark)
throw new IllegalArgumentException("lowerWaterMark must be < upperWaterMark");
map = new ConcurrentHashMap<Object, CacheEntry>(initialSize);
newThreadForCleanup = runNewThreadForCleanup;
this.upperWaterMark = upperWaterMark;
this.lowerWaterMark = lowerWaterMark;
this.acceptableWaterMark = acceptableWatermark;
if (runCleanupThread) {
new Thread() {
public void run() {
while (true) {
if (stop) break;
try {
Thread.sleep(delay * 1000);
} catch (InterruptedException e) {/*no op*/ }
markAndSweep();
}
}
}.start();
}
}
public void setAlive(boolean live) {
islive = live;
}
public Object get(Object key) {
CacheEntry e = map.get(key);
if (e == null) {
if (islive) stats.missCounter.incrementAndGet();
return null;
}
if (islive) e.lastAccessed = stats.accessCounter.incrementAndGet();
return e.value;
}
public Object remove(Object key) {
CacheEntry cacheEntry = map.remove(key);
if (cacheEntry != null) {
stats.size.decrementAndGet();
return cacheEntry.value;
}
return null;
}
public Object put(Object key, Object val) {
if (val == null) return null;
CacheEntry e = new CacheEntry(key, val, stats.accessCounter.incrementAndGet());
CacheEntry oldCacheEntry = map.put(key, e);
stats.size.incrementAndGet();
if (islive) {
stats.putCounter.incrementAndGet();
} else {
stats.nonLivePutCounter.incrementAndGet();
}
if (stats.size.get() > upperWaterMark) {
if (newThreadForCleanup) {
if (!markAndSweepLock.isLocked()) {
new Thread() {
public void run() {
markAndSweep();
}
}.start();
}
} else {
markAndSweep();
}
}
return oldCacheEntry == null ? null : oldCacheEntry.value;
}
private void markAndSweep() {
if (!markAndSweepLock.tryLock()) return;
try {
int size = stats.size.get();
long currentLatestAccessed = stats.accessCounter.get();
int itemsToBeRemoved = size - lowerWaterMark;
int itemsRemoved = 0;
if (itemsToBeRemoved < 1) return;
// currentLatestAccessed is the counter value of the item accessed most recently
// therefore remove all items whose last accessed counter is less than (currentLatestAccessed - lowerWaterMark)
long removeOlderThan = currentLatestAccessed - lowerWaterMark;
for (Map.Entry<Object, CacheEntry> entry : map.entrySet()) {
if (entry.getValue().lastAccessed <= removeOlderThan && itemsRemoved < itemsToBeRemoved) {
evictEntry(entry.getKey());
}
}
// Since the removal of items in the above loop depends on the value of the lastAccessed variable,
// between the time we recorded the number of items to be removed and the actual removal process,
// some items may graduate above the removeOlderThan value and escape eviction.
// Therefore, we again check if the size less than acceptableWaterMark, if not we remove items forcefully
// using a method which does not depend on the value of lastAccessed but can be more costly to run
size = stats.size.get();
// In the first attempt, try to use a simple algorithm to remove old entries
// If the size of the cache is <= acceptableWatermark then return
if (size <= acceptableWaterMark) return;
// Remove items until size becomes lower than acceptableWaterMark
itemsToBeRemoved = size - acceptableWaterMark;
TreeSet<CacheEntry> tree = new TreeSet<CacheEntry>();
// This loop may remove a few newer items because we try to forcefully fill a
// bucket of fixed size and remove them even if they have become newer in the meantime
// The caveat is that this may lead to more cache misses because we may have removed
// an item which was used very recently (against the philosophy of LRU)
for (Map.Entry<Object, CacheEntry> entry : map.entrySet()) {
CacheEntry v = entry.getValue();
v.lastAccessedCopy = v.lastAccessed;
if (tree.size() < itemsToBeRemoved) {
tree.add(v);
} else {
if (v.lastAccessedCopy < tree.first().lastAccessedCopy) {
tree.remove(tree.first());
tree.add(v);
}
}
}
for (CacheEntry sortCacheEntry : tree)
evictEntry(sortCacheEntry.key);
} finally {
markAndSweepLock.unlock();
}
}
private void evictEntry(Object key) {
Object o = map.remove(key);
if (o == null) return;
stats.size.decrementAndGet();
stats.evictionCounter++;
}
public Map getLatestAccessedItems(long n) {
markAndSweepLock.lock();
Map result = new LinkedHashMap();
TreeSet<CacheEntry> tree = new TreeSet<CacheEntry>();
try {
for (Map.Entry<Object, CacheEntry> entry : map.entrySet()) {
CacheEntry ce = entry.getValue();
ce.lastAccessedCopy = ce.lastAccessed;
if (tree.size() < n) {
tree.add(ce);
} else {
if (ce.lastAccessedCopy > tree.last().lastAccessedCopy) {
tree.remove(tree.last());
tree.add(entry.getValue());
}
}
}
} finally {
markAndSweepLock.unlock();
}
for (CacheEntry e : tree) {
result.put(e.key, e.value);
}
return result;
}
public int size() {
return stats.size.get();
}
public void clear() {
map.clear();
}
public Map<Object, CacheEntry> getMap() {
return map;
}
private static class CacheEntry implements Comparable<CacheEntry> {
Object key, value;
volatile long lastAccessed = 0;
long lastAccessedCopy = 0;
public CacheEntry(Object key, Object value, long lastAccessed) {
this.key = key;
this.value = value;
this.lastAccessed = lastAccessed;
}
public void setLastAccessed(long lastAccessed) {
this.lastAccessed = lastAccessed;
}
public int compareTo(CacheEntry that) {
if (this.lastAccessedCopy == that.lastAccessedCopy) return 0;
return this.lastAccessedCopy < that.lastAccessedCopy ? 1 : -1;
}
public int hashCode() {
return value.hashCode();
}
public boolean equals(Object obj) {
return value.equals(obj);
}
public String toString() {
return "key: " + key + " value: " + value + " lastAccessed:" + lastAccessed;
}
}
public void destroy() {
stop = true;
if (map != null) {
map.clear();
map = null;
}
}
public Stats getStats() {
return stats;
}
protected void finalize() throws Throwable {
destroy();
super.finalize();
}
public static class Stats {
private final AtomicLong accessCounter = new AtomicLong(0),
putCounter = new AtomicLong(0),
nonLivePutCounter = new AtomicLong(0),
missCounter = new AtomicLong();
private final AtomicInteger size = new AtomicInteger();
private long evictionCounter = 0;
public long getCumulativeLookups() {
return (accessCounter.get() - putCounter.get() - nonLivePutCounter.get()) + missCounter.get();
}
public long getCumulativeHits() {
return accessCounter.get() - putCounter.get() - nonLivePutCounter.get();
}
public long getCumulativePuts() {
return putCounter.get();
}
public long getCumulativeEvictions() {
return evictionCounter;
}
public int getCurrentSize() {
return size.get();
}
public long getCumulativeNonLivePuts() {
return nonLivePutCounter.get();
}
public long getCumulativeMisses() {
return missCounter.get();
}
}
}

View File

@ -0,0 +1,233 @@
package org.apache.solr.search;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.ConcurrentLRUCache;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.SolrCore;
import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* SolrCache based on ConcurrentLRUCache implementation.
* <p/>
* This implementation does not use a separate cleanup thread. Instead it uses the calling thread
* itself to do the cleanup when the size of the cache exceeds certain limits.
* <p/>
* Also see <a href="http://wiki.apache.org/solr/SolrCaching">SolrCaching</a>
*
* @version $Id$
* @see org.apache.solr.common.util.ConcurrentLRUCache
* @see org.apache.solr.search.SolrCache
* @since solr 1.4
*/
public class FastLRUCache implements SolrCache {
private List<ConcurrentLRUCache.Stats> cumulativeStats;
private long warmupTime = 0;
private String name;
private int autowarmCount;
private State state;
private CacheRegenerator regenerator;
private String description = "Concurrent LRU Cache";
private ConcurrentLRUCache cache;
public Object init(Map args, Object persistence, CacheRegenerator regenerator) {
state = State.CREATED;
this.regenerator = regenerator;
name = (String) args.get("name");
String str = (String) args.get("size");
final int limit = str == null ? 1024 : Integer.parseInt(str);
int minLimit;
str = (String) args.get("minSize");
if (str == null) {
minLimit = (int) (limit * 0.9);
} else {
minLimit = Integer.parseInt(str);
}
int acceptableLimit;
str = (String) args.get("acceptableSize");
if (str == null) {
acceptableLimit = (int) (limit * 0.95);
} else {
acceptableLimit = Integer.parseInt(str);
}
str = (String) args.get("initialSize");
final int initialSize = str == null ? 1024 : Integer.parseInt(str);
str = (String) args.get("autowarmCount");
autowarmCount = str == null ? 0 : Integer.parseInt(str);
description = "Concurrent LRU Cache(maxSize=" + limit + ", initialSize=" + initialSize;
if (autowarmCount > 0) {
description += ", autowarmCount=" + autowarmCount
+ ", regenerator=" + regenerator;
}
description += ')';
cache = new ConcurrentLRUCache(limit, minLimit, acceptableLimit, initialSize, false, false, -1);
cache.setAlive(false);
if (persistence == null) {
// must be the first time a cache of this type is being created
// Use a CopyOnWriteArrayList since puts are very rare and iteration may be a frequent operation
// because it is used in getStatistics()
persistence = new CopyOnWriteArrayList<ConcurrentLRUCache.Stats>();
}
cumulativeStats = (List<ConcurrentLRUCache.Stats>) persistence;
cumulativeStats.add(cache.getStats());
return cumulativeStats;
}
public String name() {
return name;
}
public int size() {
return cache.size();
}
public Object put(Object key, Object value) {
return cache.put(key, value);
}
public Object get(Object key) {
return cache.get(key);
}
public void clear() {
cache.clear();
}
public void setState(State state) {
this.state = state;
cache.setAlive(state == State.LIVE);
}
public State getState() {
return state;
}
public void warm(SolrIndexSearcher searcher, SolrCache old) throws IOException {
if (regenerator == null) return;
long warmingStartTime = System.currentTimeMillis();
FastLRUCache other = (FastLRUCache) old;
// warm entries
if (autowarmCount != 0) {
int sz = other.size();
if (autowarmCount != -1) sz = Math.min(sz, autowarmCount);
Map items = other.cache.getLatestAccessedItems(sz);
Map.Entry[] itemsArr = new Map.Entry[items.size()];
int counter = 0;
for (Object mapEntry : items.entrySet()) {
itemsArr[counter++] = (Map.Entry) mapEntry;
}
for (int i = itemsArr.length - 1; i >= 0; i--) {
try {
boolean continueRegen = regenerator.regenerateItem(searcher,
this, old, itemsArr[i].getKey(), itemsArr[i].getValue());
if (!continueRegen) break;
}
catch (Throwable e) {
SolrException.log(log, "Error during auto-warming of key:" + itemsArr[i].getKey(), e);
}
}
}
warmupTime = System.currentTimeMillis() - warmingStartTime;
}
public void close() {
}
//////////////////////// SolrInfoMBeans methods //////////////////////
public String getName() {
return FastLRUCache.class.getName();
}
public String getVersion() {
return SolrCore.version;
}
public String getDescription() {
return description;
}
public Category getCategory() {
return Category.CACHE;
}
public String getSourceId() {
return "$Id$";
}
public String getSource() {
return "$URL$";
}
public URL[] getDocs() {
return null;
}
// returns a ratio, not a percent.
private static String calcHitRatio(long lookups, long hits) {
if (lookups == 0) return "0.00";
if (lookups == hits) return "1.00";
int hundredths = (int) (hits * 100 / lookups); // rounded down
if (hundredths < 10) return "0.0" + hundredths;
return "0." + hundredths;
}
public NamedList getStatistics() {
NamedList<Serializable> lst = new SimpleOrderedMap<Serializable>();
ConcurrentLRUCache.Stats stats = cache.getStats();
long lookups = stats.getCumulativeLookups();
long hits = stats.getCumulativeHits();
long inserts = stats.getCumulativePuts();
long evictions = stats.getCumulativeEvictions();
long size = stats.getCurrentSize();
lst.add("lookups", lookups);
lst.add("hits", hits);
lst.add("hitratio", calcHitRatio(lookups, hits));
lst.add("inserts", inserts);
lst.add("evictions", evictions);
lst.add("size", size);
lst.add("warmupTime", warmupTime);
long clookups = 0;
long chits = 0;
long cinserts = 0;
long cevictions = 0;
// NOTE: It is safe to iterate on a CopyOnWriteArrayList
for (ConcurrentLRUCache.Stats statistiscs : cumulativeStats) {
clookups += statistiscs.getCumulativeLookups();
chits += statistiscs.getCumulativeHits();
cinserts += statistiscs.getCumulativePuts();
cevictions += statistiscs.getCumulativeEvictions();
}
lst.add("cumulative_lookups", clookups);
lst.add("cumulative_hits", chits);
lst.add("cumulative_hitratio", calcHitRatio(clookups, chits));
lst.add("cumulative_inserts", cinserts);
lst.add("cumulative_evictions", cevictions);
return lst;
}
public String toString() {
return name + getStatistics().toString();
}
}

View File

@ -0,0 +1,64 @@
package org.apache.solr.search;
import junit.framework.TestCase;
import org.apache.solr.common.util.NamedList;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* Test for FastLRUCache
*
* @version $Id$
* @see org.apache.solr.search.FastLRUCache
* @since solr 1.4
*/
public class TestFastLRUCache extends TestCase {
public void testSimple() throws IOException {
FastLRUCache sc = new FastLRUCache();
Map l = new HashMap();
l.put("size", "100");
l.put("initialSize", "10");
l.put("autowarmCount", "25");
CacheRegenerator cr = new CacheRegenerator() {
public boolean regenerateItem(SolrIndexSearcher newSearcher, SolrCache newCache,
SolrCache oldCache, Object oldKey, Object oldVal) throws IOException {
newCache.put(oldKey, oldVal);
return true;
}
};
Object o = sc.init(l, null, cr);
sc.setState(SolrCache.State.LIVE);
for (int i = 0; i < 101; i++) {
sc.put(i + 1, "" + (i + 1));
}
assertEquals("25", sc.get(25));
assertEquals(null, sc.get(110));
NamedList nl = sc.getStatistics();
assertEquals(2L, nl.get("lookups"));
assertEquals(1L, nl.get("hits"));
assertEquals(101L, nl.get("inserts"));
assertEquals(11L, nl.get("evictions"));
FastLRUCache scNew = new FastLRUCache();
scNew.init(l, o, cr);
scNew.warm(null, sc);
scNew.setState(SolrCache.State.LIVE);
scNew.put(103, "103");
assertEquals("90", scNew.get(90));
assertEquals(null, scNew.get(50));
nl = scNew.getStatistics();
assertEquals(2L, nl.get("lookups"));
assertEquals(1L, nl.get("hits"));
assertEquals(1L, nl.get("inserts"));
assertEquals(0L, nl.get("evictions"));
assertEquals(4L, nl.get("cumulative_lookups"));
assertEquals(2L, nl.get("cumulative_hits"));
assertEquals(102L, nl.get("cumulative_inserts"));
assertEquals(11L, nl.get("cumulative_evictions"));
}
}