SOLR-13558: Allow dynamic resizing of SolrCache-s.

This commit is contained in:
Andrzej Bialecki 2019-07-23 12:21:21 +02:00
parent 2b0efb69eb
commit 82a4614856
10 changed files with 683 additions and 160 deletions

View File

@ -48,6 +48,8 @@ Improvements
* SOLR-12368: Support InPlace DV updates for a field that does not yet exist in any documents
(hossman, Simon Willnauer, Adrien Grand, Munendra S N)
* SOLR-13558: Allow dynamic resizing of SolrCache-s. (ab)
Bug Fixes
----------------------

View File

@ -27,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.List;
import java.util.Map;
@ -52,6 +53,12 @@ public class FastLRUCache<K, V> extends SolrCacheBase implements SolrCache<K,V>,
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(FastLRUCache.class);
public static final String MIN_SIZE_PARAM = "minSize";
public static final String ACCEPTABLE_SIZE_PARAM = "acceptableSize";
public static final String INITIAL_SIZE_PARAM = "initialSize";
public static final String CLEANUP_THREAD_PARAM = "cleanupThread";
public static final String SHOW_ITEMS_PARAM = "showItems";
// contains the statistics objects for all open caches of the same type
private List<ConcurrentLRUCache.Stats> statsList;
@ -62,6 +69,12 @@ public class FastLRUCache<K, V> extends SolrCacheBase implements SolrCache<K,V>,
private int showItems = 0;
private long maxRamBytes;
private int sizeLimit;
private int minSizeLimit;
private int initialSize;
private int acceptableSize;
private boolean cleanupThread;
private long ramLowerWatermark;
private MetricsMap cacheMap;
private Set<String> metricNames = ConcurrentHashMap.newKeySet();
@ -70,45 +83,44 @@ public class FastLRUCache<K, V> extends SolrCacheBase implements SolrCache<K,V>,
@Override
public Object init(Map args, Object persistence, CacheRegenerator regenerator) {
super.init(args, regenerator);
String str = (String) args.get("size");
int limit = str == null ? 1024 : Integer.parseInt(str);
int minLimit;
str = (String) args.get("minSize");
String str = (String) args.get(SIZE_PARAM);
sizeLimit = str == null ? 1024 : Integer.parseInt(str);
str = (String) args.get(MIN_SIZE_PARAM);
if (str == null) {
minLimit = (int) (limit * 0.9);
minSizeLimit = (int) (sizeLimit * 0.9);
} else {
minLimit = Integer.parseInt(str);
minSizeLimit = Integer.parseInt(str);
}
if (minLimit <= 0) minLimit = 1;
if (limit <= minLimit) limit=minLimit+1;
checkAndAdjustLimits();
int acceptableLimit;
str = (String) args.get("acceptableSize");
str = (String) args.get(ACCEPTABLE_SIZE_PARAM);
if (str == null) {
acceptableLimit = (int) (limit * 0.95);
acceptableSize = (int) (sizeLimit * 0.95);
} else {
acceptableLimit = Integer.parseInt(str);
acceptableSize = Integer.parseInt(str);
}
// acceptable limit should be somewhere between minLimit and limit
acceptableLimit = Math.max(minLimit, acceptableLimit);
acceptableSize = Math.max(minSizeLimit, acceptableSize);
str = (String) args.get("initialSize");
final int initialSize = str == null ? limit : Integer.parseInt(str);
str = (String) args.get("cleanupThread");
boolean newThread = str == null ? false : Boolean.parseBoolean(str);
str = (String) args.get(INITIAL_SIZE_PARAM);
initialSize = str == null ? sizeLimit : Integer.parseInt(str);
str = (String) args.get(CLEANUP_THREAD_PARAM);
cleanupThread = str == null ? false : Boolean.parseBoolean(str);
str = (String) args.get("showItems");
str = (String) args.get(SHOW_ITEMS_PARAM);
showItems = str == null ? 0 : Integer.parseInt(str);
str = (String) args.get("maxRamMB");
this.maxRamBytes = str == null ? Long.MAX_VALUE : (long) (Double.parseDouble(str) * 1024L * 1024L);
str = (String) args.get(MAX_RAM_MB_PARAM);
long maxRamMB = str == null ? -1 : (long) Double.parseDouble(str);
this.maxRamBytes = maxRamMB < 0 ? Long.MAX_VALUE : maxRamMB * 1024L * 1024L;
if (maxRamBytes != Long.MAX_VALUE) {
long ramLowerWatermark = Math.round(maxRamBytes * 0.8);
description = generateDescription(maxRamBytes, ramLowerWatermark, newThread);
cache = new ConcurrentLRUCache<K, V>(ramLowerWatermark, maxRamBytes, newThread, null);
ramLowerWatermark = Math.round(maxRamBytes * 0.8);
description = generateDescription(maxRamBytes, ramLowerWatermark, cleanupThread);
cache = new ConcurrentLRUCache<>(ramLowerWatermark, maxRamBytes, cleanupThread, null);
} else {
description = generateDescription(limit, initialSize, minLimit, acceptableLimit, newThread);
cache = new ConcurrentLRUCache<>(limit, minLimit, acceptableLimit, initialSize, newThread, false, null);
ramLowerWatermark = -1L;
description = generateDescription(sizeLimit, initialSize, minSizeLimit, acceptableSize, cleanupThread);
cache = new ConcurrentLRUCache<>(sizeLimit, minSizeLimit, acceptableSize, initialSize, cleanupThread, false, null);
}
cache.setAlive(false);
@ -127,6 +139,14 @@ public class FastLRUCache<K, V> extends SolrCacheBase implements SolrCache<K,V>,
return statsList;
}
protected String generateDescription() {
if (maxRamBytes != Long.MAX_VALUE) {
return generateDescription(maxRamBytes, ramLowerWatermark, cleanupThread);
} else {
return generateDescription(sizeLimit, initialSize, minSizeLimit, acceptableSize, cleanupThread);
}
}
/**
* @return Returns the description of this Cache.
*/
@ -259,6 +279,9 @@ public class FastLRUCache<K, V> extends SolrCacheBase implements SolrCache<K,V>,
map.put("inserts", inserts);
map.put("evictions", evictions);
map.put("size", size);
map.put("cleanupThread", cleanupThread);
map.put("ramBytesUsed", ramBytesUsed());
map.put("maxRamMB", maxRamBytes != Long.MAX_VALUE ? maxRamBytes / 1024L / 1024L : -1L);
map.put("warmupTime", warmupTime);
map.put("cumulative_lookups", clookups);
@ -306,6 +329,93 @@ public class FastLRUCache<K, V> extends SolrCacheBase implements SolrCache<K,V>,
RamUsageEstimator.sizeOfObject(cache) +
RamUsageEstimator.sizeOfObject(statsList);
}
@Override
public Map<String, Object> getResourceLimits() {
Map<String, Object> limits = new HashMap<>();
limits.put(SIZE_PARAM, cache.getStats().getCurrentSize());
limits.put(MIN_SIZE_PARAM, minSizeLimit);
limits.put(ACCEPTABLE_SIZE_PARAM, acceptableSize);
limits.put(CLEANUP_THREAD_PARAM, cleanupThread);
limits.put(SHOW_ITEMS_PARAM, showItems);
limits.put(MAX_RAM_MB_PARAM, maxRamBytes != Long.MAX_VALUE ? maxRamBytes / 1024L / 1024L : -1L);
return limits;
}
@Override
public void setResourceLimit(String limitName, Object val) {
if (CLEANUP_THREAD_PARAM.equals(limitName)) {
Boolean value;
try {
value = Boolean.parseBoolean(val.toString());
cleanupThread = value;
cache.setRunCleanupThread(cleanupThread);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid new value for boolean limit '" + limitName + "': " + val);
}
}
Number value;
try {
value = Long.parseLong(String.valueOf(val));
} catch (Exception e) {
throw new IllegalArgumentException("Invalid new value for numeric limit '" + limitName +"': " + val);
}
if (!limitName.equals(MAX_RAM_MB_PARAM)) {
if (value.intValue() <= 1) {
throw new IllegalArgumentException("Invalid new value for numeric limit '" + limitName +"': " + value);
}
}
if (value.longValue() > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Invalid new value for numeric limit '" + limitName +"': " + value);
}
switch (limitName) {
case SIZE_PARAM:
sizeLimit = value.intValue();
checkAndAdjustLimits();
cache.setUpperWaterMark(sizeLimit);
cache.setLowerWaterMark(minSizeLimit);
break;
case MIN_SIZE_PARAM:
minSizeLimit = value.intValue();
checkAndAdjustLimits();
cache.setUpperWaterMark(sizeLimit);
cache.setLowerWaterMark(minSizeLimit);
break;
case ACCEPTABLE_SIZE_PARAM:
acceptableSize = value.intValue();
acceptableSize = Math.max(minSizeLimit, acceptableSize);
cache.setAcceptableWaterMark(acceptableSize);
break;
case MAX_RAM_MB_PARAM:
long maxRamMB = value.intValue();
maxRamBytes = maxRamMB < 0 ? Long.MAX_VALUE : maxRamMB * 1024L * 1024L;
if (maxRamMB < 0) {
ramLowerWatermark = Long.MIN_VALUE;
} else {
ramLowerWatermark = Math.round(maxRamBytes * 0.8);
}
cache.setRamUpperWatermark(maxRamBytes);
cache.setRamLowerWatermark(ramLowerWatermark);
break;
case SHOW_ITEMS_PARAM:
showItems = value.intValue();
break;
default:
throw new IllegalArgumentException("Unsupported limit '" + limitName + "'");
}
description = generateDescription();
}
private void checkAndAdjustLimits() {
if (minSizeLimit <= 0) minSizeLimit = 1;
if (sizeLimit <= minSizeLimit) {
if (sizeLimit > 1) {
minSizeLimit = sizeLimit - 1;
} else {
sizeLimit = minSizeLimit + 1;
}
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.solr.search;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.List;
import java.util.Map;
@ -55,6 +56,14 @@ public class LFUCache<K, V> implements SolrCache<K, V>, Accountable {
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(LFUCache.class);
public static final String TIME_DECAY_PARAM = "timeDecay";
public static final String CLEANUP_THREAD_PARAM = "cleanupThread";
public static final String INITIAL_SIZE_PARAM = "initialSize";
public static final String MIN_SIZE_PARAM = "minSize";
public static final String ACCEPTABLE_SIZE_PARAM = "acceptableSize";
public static final String AUTOWARM_COUNT_PARAM = "autowarmCount";
public static final String SHOW_ITEMS_PARAM = "showItems";
// contains the statistics objects for all open caches of the same type
private List<ConcurrentLFUCache.Stats> statsList;
@ -72,56 +81,53 @@ public class LFUCache<K, V> implements SolrCache<K, V>, Accountable {
private Set<String> metricNames = ConcurrentHashMap.newKeySet();
private MetricRegistry registry;
private int sizeLimit;
private int minSizeLimit;
private int initialSize;
private int acceptableSize;
private boolean cleanupThread;
@Override
public Object init(Map args, Object persistence, CacheRegenerator regenerator) {
state = State.CREATED;
this.regenerator = regenerator;
name = (String) args.get(NAME);
String str = (String) args.get("size");
int limit = str == null ? 1024 : Integer.parseInt(str);
int minLimit;
str = (String) args.get("minSize");
String str = (String) args.get(SIZE_PARAM);
sizeLimit = str == null ? 1024 : Integer.parseInt(str);
str = (String) args.get(MIN_SIZE_PARAM);
if (str == null) {
minLimit = (int) (limit * 0.9);
minSizeLimit = (int) (sizeLimit * 0.9);
} else {
minLimit = Integer.parseInt(str);
minSizeLimit = Integer.parseInt(str);
}
if (minLimit == 0) minLimit = 1;
if (limit <= minLimit) limit = minLimit + 1;
checkAndAdjustLimits();
int acceptableSize;
str = (String) args.get("acceptableSize");
str = (String) args.get(ACCEPTABLE_SIZE_PARAM);
if (str == null) {
acceptableSize = (int) (limit * 0.95);
acceptableSize = (int) (sizeLimit * 0.95);
} else {
acceptableSize = Integer.parseInt(str);
}
// acceptable limit should be somewhere between minLimit and limit
acceptableSize = Math.max(minLimit, acceptableSize);
acceptableSize = Math.max(minSizeLimit, acceptableSize);
str = (String) args.get("initialSize");
final int initialSize = str == null ? limit : Integer.parseInt(str);
str = (String) args.get("autowarmCount");
str = (String) args.get(INITIAL_SIZE_PARAM);
initialSize = str == null ? sizeLimit : Integer.parseInt(str);
str = (String) args.get(AUTOWARM_COUNT_PARAM);
autowarmCount = str == null ? 0 : Integer.parseInt(str);
str = (String) args.get("cleanupThread");
boolean newThread = str == null ? false : Boolean.parseBoolean(str);
str = (String) args.get(CLEANUP_THREAD_PARAM);
cleanupThread = str == null ? false : Boolean.parseBoolean(str);
str = (String) args.get("showItems");
str = (String) args.get(SHOW_ITEMS_PARAM);
showItems = str == null ? 0 : Integer.parseInt(str);
// Don't make this "efficient" by removing the test, default is true and omitting the param will make it false.
str = (String) args.get("timeDecay");
str = (String) args.get(TIME_DECAY_PARAM);
timeDecay = (str == null) ? true : Boolean.parseBoolean(str);
description = "Concurrent LFU Cache(maxSize=" + limit + ", initialSize=" + initialSize +
", minSize=" + minLimit + ", acceptableSize=" + acceptableSize + ", cleanupThread=" + newThread +
", timeDecay=" + Boolean.toString(timeDecay);
if (autowarmCount > 0) {
description += ", autowarmCount=" + autowarmCount + ", regenerator=" + regenerator;
}
description += ')';
description = generateDescription();
cache = new ConcurrentLFUCache<>(limit, minLimit, acceptableSize, initialSize, newThread, false, null, timeDecay);
cache = new ConcurrentLFUCache<>(sizeLimit, minSizeLimit, acceptableSize, initialSize, cleanupThread, false, null, timeDecay);
cache.setAlive(false);
statsList = (List<ConcurrentLFUCache.Stats>) persistence;
@ -138,6 +144,17 @@ public class LFUCache<K, V> implements SolrCache<K, V>, Accountable {
return statsList;
}
private String generateDescription() {
String descr = "Concurrent LFU Cache(maxSize=" + sizeLimit + ", initialSize=" + initialSize +
", minSize=" + minSizeLimit + ", acceptableSize=" + acceptableSize + ", cleanupThread=" + cleanupThread +
", timeDecay=" + Boolean.toString(timeDecay);
if (autowarmCount > 0) {
descr += ", autowarmCount=" + autowarmCount + ", regenerator=" + regenerator;
}
descr += ')';
return descr;
}
@Override
public String name() {
return name;
@ -258,6 +275,7 @@ public class LFUCache<K, V> implements SolrCache<K, V>, Accountable {
map.put("warmupTime", warmupTime);
map.put("timeDecay", timeDecay);
map.put("cleanupThread", cleanupThread);
long clookups = 0;
long chits = 0;
@ -276,6 +294,7 @@ public class LFUCache<K, V> implements SolrCache<K, V>, Accountable {
map.put("cumulative_hitratio", calcHitRatio(clookups, chits));
map.put("cumulative_inserts", cinserts);
map.put("cumulative_evictions", cevictions);
map.put("ramBytesUsed", ramBytesUsed());
if (detailed && showItems != 0) {
Map items = cache.getMostUsedItems(showItems == -1 ? Integer.MAX_VALUE : showItems);
@ -325,4 +344,89 @@ public class LFUCache<K, V> implements SolrCache<K, V>, Accountable {
RamUsageEstimator.sizeOfObject(cache);
}
}
@Override
public Map<String, Object> getResourceLimits() {
Map<String, Object> limits = new HashMap<>();
limits.put(SIZE_PARAM, cache.getStats().getCurrentSize());
limits.put(MIN_SIZE_PARAM, minSizeLimit);
limits.put(ACCEPTABLE_SIZE_PARAM, acceptableSize);
limits.put(AUTOWARM_COUNT_PARAM, autowarmCount);
limits.put(CLEANUP_THREAD_PARAM, cleanupThread);
limits.put(SHOW_ITEMS_PARAM, showItems);
limits.put(TIME_DECAY_PARAM, timeDecay);
return limits;
}
@Override
public synchronized void setResourceLimit(String limitName, Object val) {
if (TIME_DECAY_PARAM.equals(limitName) || CLEANUP_THREAD_PARAM.equals(limitName)) {
Boolean value;
try {
value = Boolean.parseBoolean(String.valueOf(val));
} catch (Exception e) {
throw new IllegalArgumentException("Invalid value of boolean limit '" + limitName + "': " + val);
}
switch (limitName) {
case TIME_DECAY_PARAM:
timeDecay = value;
cache.setTimeDecay(timeDecay);
break;
case CLEANUP_THREAD_PARAM:
cleanupThread = value;
cache.setRunCleanupThread(cleanupThread);
break;
}
} else {
Number value;
try {
value = Long.parseLong(String.valueOf(val));
} catch (Exception e) {
throw new IllegalArgumentException("Invalid new value for numeric limit '" + limitName +"': " + val);
}
if (value.intValue() <= 1 || value.longValue() > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Out of range new value for numeric limit '" + limitName +"': " + value);
}
switch (limitName) {
case SIZE_PARAM:
sizeLimit = value.intValue();
checkAndAdjustLimits();
cache.setUpperWaterMark(sizeLimit);
cache.setLowerWaterMark(minSizeLimit);
break;
case MIN_SIZE_PARAM:
minSizeLimit = value.intValue();
checkAndAdjustLimits();
cache.setUpperWaterMark(sizeLimit);
cache.setLowerWaterMark(minSizeLimit);
break;
case ACCEPTABLE_SIZE_PARAM:
acceptableSize = value.intValue();
acceptableSize = Math.max(minSizeLimit, acceptableSize);
cache.setAcceptableWaterMark(acceptableSize);
break;
case AUTOWARM_COUNT_PARAM:
autowarmCount = value.intValue();
break;
case SHOW_ITEMS_PARAM:
showItems = value.intValue();
break;
default:
throw new IllegalArgumentException("Unsupported numeric limit '" + limitName + "'");
}
}
description = generateDescription();
}
private void checkAndAdjustLimits() {
if (minSizeLimit <= 0) minSizeLimit = 1;
if (sizeLimit <= minSizeLimit) {
if (sizeLimit > 1) {
minSizeLimit = sizeLimit - 1;
} else {
sizeLimit = minSizeLimit + 1;
}
}
}
}

View File

@ -18,6 +18,7 @@ package org.apache.solr.search;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@ -75,6 +76,8 @@ public class LRUCache<K,V> extends SolrCacheBase implements SolrCache<K,V>, Acco
private MetricsMap cacheMap;
private Set<String> metricNames = ConcurrentHashMap.newKeySet();
private MetricRegistry registry;
private int sizeLimit;
private int initialSize;
private long maxRamBytes = Long.MAX_VALUE;
// The synchronization used for the map will be used to update this,
@ -84,50 +87,54 @@ public class LRUCache<K,V> extends SolrCacheBase implements SolrCache<K,V>, Acco
@Override
public Object init(Map args, Object persistence, CacheRegenerator regenerator) {
super.init(args, regenerator);
String str = (String)args.get("size");
final int limit = str==null ? 1024 : Integer.parseInt(str);
String str = (String)args.get(SIZE_PARAM);
this.sizeLimit = str==null ? 1024 : Integer.parseInt(str);
str = (String)args.get("initialSize");
final int initialSize = Math.min(str==null ? 1024 : Integer.parseInt(str), limit);
str = (String) args.get("maxRamMB");
final long maxRamBytes = this.maxRamBytes = str == null ? Long.MAX_VALUE : (long) (Double.parseDouble(str) * 1024L * 1024L);
description = generateDescription(limit, initialSize);
initialSize = Math.min(str==null ? 1024 : Integer.parseInt(str), sizeLimit);
str = (String) args.get(MAX_RAM_MB_PARAM);
this.maxRamBytes = str == null ? Long.MAX_VALUE : (long) (Double.parseDouble(str) * 1024L * 1024L);
description = generateDescription();
map = new LinkedHashMap<K,V>(initialSize, 0.75f, true) {
@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
if (size() > limit || ramBytesUsed > maxRamBytes) {
if (maxRamBytes != Long.MAX_VALUE && ramBytesUsed > maxRamBytes) {
long bytesToDecrement = 0;
Iterator<Map.Entry<K, V>> iterator = entrySet().iterator();
do {
Map.Entry<K, V> entry = iterator.next();
if (entry.getKey() != null) {
bytesToDecrement += RamUsageEstimator.sizeOfObject(entry.getKey(), QUERY_DEFAULT_RAM_BYTES_USED);
}
if (entry.getValue() != null) {
bytesToDecrement += ((Accountable) entry.getValue()).ramBytesUsed();
}
bytesToDecrement += LINKED_HASHTABLE_RAM_BYTES_PER_ENTRY;
ramBytesUsed -= bytesToDecrement;
iterator.remove();
evictions++;
evictionsRamUsage++;
stats.evictions.increment();
stats.evictionsRamUsage.increment();
} while (iterator.hasNext() && ramBytesUsed > maxRamBytes);
// must return false according to javadocs of removeEldestEntry if we're modifying
// the map ourselves
return false;
} else {
if (ramBytesUsed > getMaxRamBytes()) {
Iterator<Map.Entry<K, V>> iterator = entrySet().iterator();
do {
Map.Entry<K, V> entry = iterator.next();
long bytesToDecrement = RamUsageEstimator.sizeOfObject(entry.getKey(), QUERY_DEFAULT_RAM_BYTES_USED);
bytesToDecrement += RamUsageEstimator.sizeOfObject(entry.getValue(), QUERY_DEFAULT_RAM_BYTES_USED);
bytesToDecrement += LINKED_HASHTABLE_RAM_BYTES_PER_ENTRY;
ramBytesUsed -= bytesToDecrement;
iterator.remove();
evictions++;
evictionsRamUsage++;
stats.evictions.increment();
stats.evictionsRamUsage.increment();
} while (iterator.hasNext() && ramBytesUsed > getMaxRamBytes());
// must return false according to javadocs of removeEldestEntry if we're modifying
// the map ourselves
return false;
} else if (size() > getSizeLimit()) {
Iterator<Map.Entry<K, V>> iterator = entrySet().iterator();
do {
Map.Entry<K, V> entry = iterator.next();
long bytesToDecrement = RamUsageEstimator.sizeOfObject(entry.getKey(), QUERY_DEFAULT_RAM_BYTES_USED);
bytesToDecrement += RamUsageEstimator.sizeOfObject(entry.getValue(), QUERY_DEFAULT_RAM_BYTES_USED);
bytesToDecrement += LINKED_HASHTABLE_RAM_BYTES_PER_ENTRY;
ramBytesUsed -= bytesToDecrement;
// increment evictions regardless of state.
// this doesn't need to be synchronized because it will
// only be called in the context of a higher level synchronized block.
iterator.remove();
evictions++;
stats.evictions.increment();
return true;
}
} while (iterator.hasNext() && size() > getSizeLimit());
// must return false according to javadocs of removeEldestEntry if we're modifying
// the map ourselves
return false;
}
// neither size nor RAM exceeded - ok to keep the entry
return false;
}
};
@ -142,17 +149,25 @@ public class LRUCache<K,V> extends SolrCacheBase implements SolrCache<K,V>, Acco
return persistence;
}
public int getSizeLimit() {
return sizeLimit;
}
public long getMaxRamBytes() {
return maxRamBytes;
}
/**
*
* @return Returns the description of this cache.
*/
private String generateDescription(int limit, int initialSize) {
String description = "LRU Cache(maxSize=" + limit + ", initialSize=" + initialSize;
private String generateDescription() {
String description = "LRU Cache(maxSize=" + getSizeLimit() + ", initialSize=" + initialSize;
if (isAutowarmingOn()) {
description += ", " + getAutowarmDescription();
}
if (maxRamBytes != Long.MAX_VALUE) {
description += ", maxRamMB=" + (maxRamBytes / 1024L / 1024L);
if (getMaxRamBytes() != Long.MAX_VALUE) {
description += ", maxRamMB=" + (getMaxRamBytes() / 1024L / 1024L);
}
description += ')';
return description;
@ -167,6 +182,9 @@ public class LRUCache<K,V> extends SolrCacheBase implements SolrCache<K,V>, Acco
@Override
public V put(K key, V value) {
if (sizeLimit == Integer.MAX_VALUE && maxRamBytes == Long.MAX_VALUE) {
throw new IllegalStateException("Cache: " + getName() + " has neither size nor RAM limit!");
}
synchronized (map) {
if (getState() == State.LIVE) {
stats.inserts.increment();
@ -177,32 +195,15 @@ public class LRUCache<K,V> extends SolrCacheBase implements SolrCache<K,V>, Acco
inserts++;
// important to calc and add new ram bytes first so that removeEldestEntry can compare correctly
if (maxRamBytes != Long.MAX_VALUE) {
long keySize = 0;
if (key != null) {
keySize = RamUsageEstimator.sizeOfObject(key, QUERY_DEFAULT_RAM_BYTES_USED);
}
long valueSize = 0;
if (value != null) {
if (value instanceof Accountable) {
Accountable accountable = (Accountable) value;
valueSize = accountable.ramBytesUsed();
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Cache: "
+ getName() + " is configured with maxRamBytes=" + RamUsageEstimator.humanReadableUnits(maxRamBytes)
+ " but its values do not implement org.apache.lucene.util.Accountable");
}
}
ramBytesUsed += keySize + valueSize + LINKED_HASHTABLE_RAM_BYTES_PER_ENTRY;
}
long keySize = RamUsageEstimator.sizeOfObject(key, QUERY_DEFAULT_RAM_BYTES_USED);
long valueSize = RamUsageEstimator.sizeOfObject(value, QUERY_DEFAULT_RAM_BYTES_USED);
ramBytesUsed += keySize + valueSize + LINKED_HASHTABLE_RAM_BYTES_PER_ENTRY;
V old = map.put(key, value);
if (maxRamBytes != Long.MAX_VALUE && old != null) {
long bytesToDecrement = ((Accountable) old).ramBytesUsed();
if (old != null) {
long bytesToDecrement = RamUsageEstimator.sizeOfObject(old, QUERY_DEFAULT_RAM_BYTES_USED);
// the key existed in the map but we added its size before the put, so let's back out
bytesToDecrement += LINKED_HASHTABLE_RAM_BYTES_PER_ENTRY;
if (key != null) {
bytesToDecrement += RamUsageEstimator.sizeOfObject(key, QUERY_DEFAULT_RAM_BYTES_USED);
}
bytesToDecrement += RamUsageEstimator.sizeOfObject(key, QUERY_DEFAULT_RAM_BYTES_USED);
ramBytesUsed -= bytesToDecrement;
}
return old;
@ -318,11 +319,9 @@ public class LRUCache<K,V> extends SolrCacheBase implements SolrCache<K,V>, Acco
res.put("inserts", inserts);
res.put("evictions", evictions);
res.put("size", map.size());
if (maxRamBytes != Long.MAX_VALUE) {
res.put("maxRamMB", maxRamBytes / 1024L / 1024L);
res.put("ramBytesUsed", ramBytesUsed());
res.put("evictionsRamUsage", evictionsRamUsage);
}
res.put("ramBytesUsed", ramBytesUsed());
res.put("maxRamMB", maxRamBytes != Long.MAX_VALUE ? maxRamBytes / 1024L / 1024L : -1L);
res.put("evictionsRamUsage", evictionsRamUsage);
}
res.put("warmupTime", warmupTime);
@ -333,9 +332,7 @@ public class LRUCache<K,V> extends SolrCacheBase implements SolrCache<K,V>, Acco
res.put("cumulative_hitratio", calcHitRatio(clookups, chits));
res.put("cumulative_inserts", stats.inserts.longValue());
res.put("cumulative_evictions", stats.evictions.longValue());
if (maxRamBytes != Long.MAX_VALUE) {
res.put("cumulative_evictionsRamUsage", stats.evictionsRamUsage.longValue());
}
res.put("cumulative_evictionsRamUsage", stats.evictionsRamUsage.longValue());
});
manager.registerGauge(this, registryName, cacheMap, tag, true, scope, getCategory().toString());
}
@ -368,4 +365,46 @@ public class LRUCache<K,V> extends SolrCacheBase implements SolrCache<K,V>, Acco
return Accountables.namedAccountables(getName(), (Map<?, ? extends Accountable>) map);
}
}
@Override
public Map<String, Object> getResourceLimits() {
Map<String, Object> limits = new HashMap<>();
limits.put(SIZE_PARAM, sizeLimit);
limits.put(MAX_RAM_MB_PARAM, maxRamBytes != Long.MAX_VALUE ? maxRamBytes / 1024L / 1024L : -1L);
return limits;
}
@Override
public void setResourceLimit(String limitName, Object val) {
if (!(val instanceof Number)) {
try {
val = Long.parseLong(String.valueOf(val));
} catch (Exception e) {
throw new IllegalArgumentException("Unsupported value type (not a number) for limit '" + limitName + "': " + val + " (" + val.getClass().getName() + ")");
}
}
Number value = (Number)val;
if (value.longValue() > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Invalid new value for limit '" + limitName +"': " + value);
}
switch (limitName) {
case SIZE_PARAM:
if (value.intValue() > 0) {
sizeLimit = value.intValue();
} else {
sizeLimit = Integer.MAX_VALUE;
}
break;
case MAX_RAM_MB_PARAM:
if (value.intValue() > 0) {
maxRamBytes = value.intValue() * 1024L * 1024L;
} else {
maxRamBytes = Long.MAX_VALUE;
}
break;
default:
throw new IllegalArgumentException("Unsupported limit name '" + limitName + "'");
}
description = generateDescription();
}
}

View File

@ -27,6 +27,9 @@ import java.util.Map;
*/
public interface SolrCache<K,V> extends SolrInfoBean, SolrMetricProducer {
String SIZE_PARAM = "size";
String MAX_RAM_MB_PARAM = "maxRamMB";
/**
* The initialization routine. Instance specific arguments are passed in
* the <code>args</code> map.
@ -126,4 +129,20 @@ public interface SolrCache<K,V> extends SolrInfoBean, SolrMetricProducer {
/** Frees any non-memory resources */
public void close();
/** Report current resource limits. */
public Map<String, Object> getResourceLimits();
/** Set resource limits. */
default void setResourceLimits(Map<String, Object> limits) throws Exception {
if (limits == null || limits.isEmpty()) {
return;
}
for (Map.Entry<String, Object> entry : limits.entrySet()) {
setResourceLimit(entry.getKey(), entry.getValue());
}
}
/** Set a named resource limit. */
public void setResourceLimit(String limitName, Object value) throws Exception;
}

View File

@ -54,37 +54,32 @@ public class ConcurrentLFUCache<K, V> implements Cache<K,V>, Accountable {
RamUsageEstimator.shallowSizeOfInstance(ConcurrentHashMap.class);
private final ConcurrentHashMap<Object, CacheEntry<K, V>> map;
private final int upperWaterMark, lowerWaterMark;
private int upperWaterMark, lowerWaterMark;
private final ReentrantLock markAndSweepLock = new ReentrantLock(true);
private boolean isCleaning = false; // not volatile... piggybacked on other volatile vars
private final boolean newThreadForCleanup;
private boolean newThreadForCleanup;
private boolean runCleanupThread;
private volatile boolean islive = true;
private final Stats stats = new Stats();
@SuppressWarnings("unused")
private final int acceptableWaterMark;
private int acceptableWaterMark;
private long lowHitCount = 0; // not volatile, only accessed in the cleaning method
private final EvictionListener<K, V> evictionListener;
private CleanupThread cleanupThread;
private final boolean timeDecay;
private boolean timeDecay;
private final AtomicLong ramBytes = new AtomicLong(0);
public ConcurrentLFUCache(int upperWaterMark, final int lowerWaterMark, int acceptableSize,
int initialSize, boolean runCleanupThread, boolean runNewThreadForCleanup,
EvictionListener<K, V> evictionListener, boolean timeDecay) {
if (upperWaterMark < 1) throw new IllegalArgumentException("upperWaterMark must be > 0");
if (lowerWaterMark >= upperWaterMark)
throw new IllegalArgumentException("lowerWaterMark must be < upperWaterMark");
setUpperWaterMark(upperWaterMark);
setLowerWaterMark(lowerWaterMark);
setAcceptableWaterMark(acceptableSize);
map = new ConcurrentHashMap<>(initialSize);
newThreadForCleanup = runNewThreadForCleanup;
this.upperWaterMark = upperWaterMark;
this.lowerWaterMark = lowerWaterMark;
this.acceptableWaterMark = acceptableSize;
this.evictionListener = evictionListener;
this.timeDecay = timeDecay;
if (runCleanupThread) {
cleanupThread = new CleanupThread(this);
cleanupThread.start();
}
setNewThreadForCleanup(runNewThreadForCleanup);
setTimeDecay(timeDecay);
setRunCleanupThread(runCleanupThread);
}
public ConcurrentLFUCache(int size, int lowerWatermark) {
@ -96,6 +91,44 @@ public class ConcurrentLFUCache<K, V> implements Cache<K,V>, Accountable {
islive = live;
}
public void setUpperWaterMark(int upperWaterMark) {
if (upperWaterMark < 1) throw new IllegalArgumentException("upperWaterMark must be > 0");
this.upperWaterMark = upperWaterMark;
}
public void setLowerWaterMark(int lowerWaterMark) {
if (lowerWaterMark >= upperWaterMark)
throw new IllegalArgumentException("lowerWaterMark must be < upperWaterMark");
this.lowerWaterMark = lowerWaterMark;
}
public void setAcceptableWaterMark(int acceptableWaterMark) {
this.acceptableWaterMark = acceptableWaterMark;
}
public void setTimeDecay(boolean timeDecay) {
this.timeDecay = timeDecay;
}
public synchronized void setNewThreadForCleanup(boolean newThreadForCleanup) {
this.newThreadForCleanup = newThreadForCleanup;
}
public synchronized void setRunCleanupThread(boolean runCleanupThread) {
this.runCleanupThread = runCleanupThread;
if (this.runCleanupThread) {
if (cleanupThread == null) {
cleanupThread = new CleanupThread(this);
cleanupThread.start();
}
} else {
if (cleanupThread != null) {
cleanupThread.stopThread();
cleanupThread = null;
}
}
}
@Override
public V get(K key) {
CacheEntry<K, V> e = map.get(key);

View File

@ -63,18 +63,19 @@ public class ConcurrentLRUCache<K,V> implements Cache<K,V>, Accountable {
RamUsageEstimator.shallowSizeOfInstance(ConcurrentHashMap.class);
private final ConcurrentHashMap<Object, CacheEntry<K,V>> map;
private final int upperWaterMark, lowerWaterMark;
private int upperWaterMark, lowerWaterMark;
private final ReentrantLock markAndSweepLock = new ReentrantLock(true);
private boolean isCleaning = false; // not volatile... piggybacked on other volatile vars
private final boolean newThreadForCleanup;
private boolean newThreadForCleanup;
private volatile boolean islive = true;
private final Stats stats = new Stats();
private final int acceptableWaterMark;
private int acceptableWaterMark;
private long oldestEntry = 0; // not volatile, only accessed in the cleaning method
private final EvictionListener<K,V> evictionListener;
private CleanupThread cleanupThread;
private boolean runCleanupThread;
private final long ramLowerWatermark, ramUpperWatermark;
private long ramLowerWatermark, ramUpperWatermark;
private final AtomicLong ramBytes = new AtomicLong(0);
public ConcurrentLRUCache(long ramLowerWatermark, long ramUpperWatermark,
@ -90,10 +91,7 @@ public class ConcurrentLRUCache<K,V> implements Cache<K,V>, Accountable {
this.lowerWaterMark = Integer.MIN_VALUE;
this.upperWaterMark = Integer.MAX_VALUE;
if (runCleanupThread) {
cleanupThread = new CleanupThread(this);
cleanupThread.start();
}
setRunCleanupThread(runCleanupThread);
}
public ConcurrentLRUCache(int upperWaterMark, final int lowerWaterMark, int acceptableWatermark,
@ -108,10 +106,7 @@ public class ConcurrentLRUCache<K,V> implements Cache<K,V>, Accountable {
this.lowerWaterMark = lowerWaterMark;
this.acceptableWaterMark = acceptableWatermark;
this.evictionListener = evictionListener;
if (runCleanupThread) {
cleanupThread = new CleanupThread(this);
cleanupThread.start();
}
setRunCleanupThread(runCleanupThread);
this.ramLowerWatermark = Long.MIN_VALUE;
this.ramUpperWatermark = Long.MAX_VALUE;
}
@ -125,6 +120,48 @@ public class ConcurrentLRUCache<K,V> implements Cache<K,V>, Accountable {
islive = live;
}
public void setUpperWaterMark(int upperWaterMark) {
if (upperWaterMark < 1) throw new IllegalArgumentException("upperWaterMark must be >= 1");
this.upperWaterMark = upperWaterMark;
}
public void setLowerWaterMark(int lowerWaterMark) {
this.lowerWaterMark = lowerWaterMark;
}
public void setAcceptableWaterMark(int acceptableWaterMark) {
this.acceptableWaterMark = acceptableWaterMark;
}
public void setRamUpperWatermark(long ramUpperWatermark) {
if (ramUpperWatermark < 1) {
throw new IllegalArgumentException("ramUpperWaterMark must be >= 1");
}
this.ramUpperWatermark = ramUpperWatermark;
}
public void setRamLowerWatermark(long ramLowerWatermark) {
if (ramLowerWatermark < 0) {
throw new IllegalArgumentException("ramLowerWaterMark must be >= 0");
}
this.ramLowerWatermark = ramLowerWatermark;
}
public synchronized void setRunCleanupThread(boolean runCleanupThread) {
this.runCleanupThread = runCleanupThread;
if (this.runCleanupThread) {
if (cleanupThread == null) {
cleanupThread = new CleanupThread(this);
cleanupThread.start();
}
} else {
if (cleanupThread != null) {
cleanupThread.stopThread();
cleanupThread = null;
}
}
}
@Override
public V get(K key) {
CacheEntry<K,V> e = map.get(key);
@ -211,11 +248,11 @@ public class ConcurrentLRUCache<K,V> implements Cache<K,V>, Accountable {
if (!markAndSweepLock.tryLock()) return;
try {
if (upperWaterMark != Integer.MAX_VALUE) {
if (upperWaterMark < size()) {
markAndSweepByCacheSize();
} else if (ramUpperWatermark != Long.MAX_VALUE) {
} else if (ramUpperWatermark < ramBytesUsed()) {
markAndSweepByRamSize();
} else {
} else if (upperWaterMark == Integer.MAX_VALUE && ramUpperWatermark == Long.MAX_VALUE) {
// should never happen
throw new AssertionError("ConcurrentLRUCache initialized with neither size limits nor ram limits");
}

View File

@ -19,6 +19,7 @@ package org.apache.solr.search;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.WildcardQuery;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.SolrTestCase;
@ -326,6 +327,76 @@ public class TestFastLRUCache extends SolrTestCase {
}
}
public void testSetLimits() throws Exception {
FastLRUCache<String, Accountable> cache = new FastLRUCache<>();
cache.initializeMetrics(metricManager, registry, "foo", scope);
Map<String, String> params = new HashMap<>();
params.put("size", "6");
params.put("maxRamMB", "8");
CacheRegenerator cr = new NoOpRegenerator();
Object o = cache.init(params, null, cr);
for (int i = 0; i < 6; i++) {
cache.put("" + i, new Accountable() {
@Override
public long ramBytesUsed() {
return 1024 * 1024;
}
});
}
// no evictions yet
assertEquals(6, cache.size());
// this also sets minLimit = 4
cache.setResourceLimit(SolrCache.SIZE_PARAM, 5);
// should not happen yet - evictions are triggered by put
assertEquals(6, cache.size());
cache.put("6", new Accountable() {
@Override
public long ramBytesUsed() {
return 1024 * 1024;
}
});
// should evict to minLimit
assertEquals(4, cache.size());
// modify ram limit
cache.setResourceLimit(SolrCache.MAX_RAM_MB_PARAM, 3);
// should not happen yet - evictions are triggered by put
assertEquals(4, cache.size());
// this evicts down to 3MB * 0.8, ie. ramLowerWaterMark
cache.put("7", new Accountable() {
@Override
public long ramBytesUsed() {
return 0;
}
});
assertEquals(3, cache.size());
assertNotNull("5", cache.get("5"));
assertNotNull("6", cache.get("6"));
assertNotNull("7", cache.get("7"));
// scale up
cache.setResourceLimit(SolrCache.MAX_RAM_MB_PARAM, 4);
cache.put("8", new Accountable() {
@Override
public long ramBytesUsed() {
return 1024 * 1024;
}
});
assertEquals(4, cache.size());
cache.setResourceLimit(SolrCache.SIZE_PARAM, 10);
for (int i = 0; i < 6; i++) {
cache.put("new" + i, new Accountable() {
@Override
public long ramBytesUsed() {
return 0;
}
});
}
assertEquals(10, cache.size());
}
/***
public void testPerf() {
doPerfTest(1000000, 100000, 200000); // big cache, warmup

View File

@ -454,6 +454,46 @@ public class TestLFUCache extends SolrTestCaseJ4 {
}
public void testSetLimits() throws Exception {
SolrMetricManager metricManager = new SolrMetricManager();
Random r = random();
String registry = TestUtil.randomSimpleString(r, 2, 10);
String scope = TestUtil.randomSimpleString(r, 2, 10);
LFUCache<String, String> cache = new LFUCache<>();
cache.initializeMetrics(metricManager, registry, "foo", scope + ".lfuCache");
Map<String, String> params = new HashMap<>();
params.put("size", "6");
CacheRegenerator cr = new NoOpRegenerator();
Object o = cache.init(params, null, cr);
for (int i = 0; i < 6; i++) {
cache.put("" + i, "foo " + i);
}
// no evictions yet
assertEquals(6, cache.size());
// this sets minSize = 4, evictions will target minSize
cache.setResourceLimit(SolrCache.SIZE_PARAM, 5);
// should not happen yet - evictions are triggered by put
assertEquals(6, cache.size());
cache.put("6", "foo 6");
// should evict to minSize
assertEquals(4, cache.size());
// should allow adding 1 more item before hitting "size" limit
cache.put("7", "foo 7");
assertEquals(5, cache.size());
// should evict down to minSize = 4
cache.put("8", "foo 8");
assertEquals(4, cache.size());
// scale up
cache.setResourceLimit(SolrCache.SIZE_PARAM, 10);
for (int i = 0; i < 6; i++) {
cache.put("new" + i, "bar " + i);
}
assertEquals(10, cache.size());
}
// From the original LRU cache tests, they're commented out there too because they take a while.
// void doPerfTest(int iter, int cacheSize, int maxKey) {
// long start = System.currentTimeMillis();

View File

@ -24,7 +24,6 @@ import org.apache.lucene.util.Accountable;
import org.apache.solr.SolrTestCase;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.common.SolrException;
import org.apache.solr.metrics.SolrMetricManager;
/**
@ -175,16 +174,85 @@ public class TestLRUCache extends SolrTestCase {
assertEquals(RamUsageEstimator.shallowSizeOfInstance(LRUCache.class), accountableLRUCache.ramBytesUsed());
}
public void testNonAccountableValues() throws Exception {
LRUCache<String, String> cache = new LRUCache<>();
// public void testNonAccountableValues() throws Exception {
// LRUCache<String, String> cache = new LRUCache<>();
// Map<String, String> params = new HashMap<>();
// params.put("size", "5");
// params.put("maxRamMB", "1");
// CacheRegenerator cr = new NoOpRegenerator();
// Object o = cache.init(params, null, cr);
//
// expectThrows(SolrException.class, "Adding a non-accountable value to a cache configured with maxRamBytes should have failed",
// () -> cache.put("1", "1")
// );
// }
//
public void testSetLimits() throws Exception {
LRUCache<String, Accountable> cache = new LRUCache<>();
cache.initializeMetrics(metricManager, registry, "foo", scope);
Map<String, String> params = new HashMap<>();
params.put("size", "5");
params.put("maxRamMB", "1");
params.put("size", "6");
params.put("maxRamMB", "8");
CacheRegenerator cr = new NoOpRegenerator();
Object o = cache.init(params, null, cr);
for (int i = 0; i < 6; i++) {
cache.put("" + i, new Accountable() {
@Override
public long ramBytesUsed() {
return 1024 * 1024;
}
});
}
// no evictions yet
assertEquals(6, cache.size());
cache.setResourceLimit(SolrCache.SIZE_PARAM, 5);
// should not happen yet - evictions are triggered by put
assertEquals(6, cache.size());
cache.put("6", new Accountable() {
@Override
public long ramBytesUsed() {
return 1024 * 1024;
}
});
// should evict by count limit
assertEquals(5, cache.size());
expectThrows(SolrException.class, "Adding a non-accountable value to a cache configured with maxRamBytes should have failed",
() -> cache.put("1", "1")
);
// modify ram limit
cache.setResourceLimit(SolrCache.MAX_RAM_MB_PARAM, 3);
// should not happen yet - evictions are triggered by put
assertEquals(5, cache.size());
cache.put("7", new Accountable() {
@Override
public long ramBytesUsed() {
return 0;
}
});
assertEquals(3, cache.size());
assertNotNull("5", cache.get("5"));
assertNotNull("6", cache.get("6"));
assertNotNull("7", cache.get("7"));
// scale up
cache.setResourceLimit(SolrCache.MAX_RAM_MB_PARAM, 4);
cache.put("8", new Accountable() {
@Override
public long ramBytesUsed() {
return 1024 * 1024;
}
});
assertEquals(4, cache.size());
cache.setResourceLimit(SolrCache.SIZE_PARAM, 10);
for (int i = 0; i < 6; i++) {
cache.put("new" + i, new Accountable() {
@Override
public long ramBytesUsed() {
return 0;
}
});
}
assertEquals(10, cache.size());
}
}