mirror of https://github.com/apache/lucene.git
SOLR-13898: Non-atomic use of SolrCache get / put.
This commit is contained in:
parent
59cc299c7e
commit
0c3233877b
|
@ -125,6 +125,9 @@ Improvements
|
|||
|
||||
* SOLR-13844: Remove replica recovery terms with the replica term (Houston Putman via Cao Manh Dat)
|
||||
|
||||
* SOLR-13898: Use the new SolrCache.commputeIfAbsent instead of non-atomic get / put in order to
|
||||
improve cache hit ratios. (ab, Ben Manes)
|
||||
|
||||
|
||||
Optimizations
|
||||
---------------------
|
||||
|
|
|
@ -162,13 +162,14 @@ public class RptWithGeometrySpatialField extends AbstractSpatialFieldType<Compos
|
|||
throw new IllegalStateException("Leaf " + readerContext.reader() + " is not suited for caching");
|
||||
}
|
||||
PerSegCacheKey key = new PerSegCacheKey(cacheHelper.getKey(), docId);
|
||||
Shape shape = cache.get(key);
|
||||
if (shape == null) {
|
||||
shape = targetFuncValues.value();
|
||||
if (shape != null) {
|
||||
cache.put(key, shape);
|
||||
Shape shape = cache.computeIfAbsent(key, k -> {
|
||||
try {
|
||||
return targetFuncValues.value();
|
||||
} catch (IOException e) {
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
});
|
||||
if (shape != null) {
|
||||
//optimize shape on a cache hit if possible. This must be thread-safe and it is.
|
||||
if (shape instanceof JtsGeometry) {
|
||||
((JtsGeometry) shape).index(); // TODO would be nice if some day we didn't have to cast
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.function.Function;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.RemovalCause;
|
||||
import com.github.benmanes.caffeine.cache.RemovalListener;
|
||||
|
@ -175,6 +176,18 @@ public class CaffeineCache<K, V> extends SolrCacheBase implements SolrCache<K, V
|
|||
return cache.getIfPresent(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
|
||||
return cache.get(key, k -> {
|
||||
inserts.increment();
|
||||
V value = mappingFunction.apply(k);
|
||||
ramBytes.add(RamUsageEstimator.sizeOfObject(key, RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED) +
|
||||
RamUsageEstimator.sizeOfObject(value, RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED));
|
||||
ramBytes.add(RamUsageEstimator.LINKED_HASHTABLE_RAM_BYTES_PER_ENTRY);
|
||||
return value;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public V put(K key, V val) {
|
||||
inserts.increment();
|
||||
|
@ -189,6 +202,17 @@ public class CaffeineCache<K, V> extends SolrCacheBase implements SolrCache<K, V
|
|||
return old;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V remove(K key) {
|
||||
V existing = cache.asMap().remove(key);
|
||||
if (existing != null) {
|
||||
ramBytes.add(- RamUsageEstimator.sizeOfObject(key, RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED));
|
||||
ramBytes.add(- RamUsageEstimator.sizeOfObject(existing, RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED));
|
||||
ramBytes.add(- RamUsageEstimator.LINKED_HASHTABLE_RAM_BYTES_PER_ENTRY);
|
||||
}
|
||||
return existing;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
cache.invalidateAll();
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.lucene.util.Accountable;
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
|
@ -239,11 +240,21 @@ public class FastLRUCache<K, V> extends SolrCacheBase implements SolrCache<K, V>
|
|||
return cache.put(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public V remove(K key) {
|
||||
return cache.remove(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public V get(K key) {
|
||||
return cache.get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
|
||||
return cache.computeIfAbsent(key, mappingFunction);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
cache.clear();
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.lucene.util.Accountable;
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
|
@ -179,6 +180,16 @@ public class LFUCache<K, V> implements SolrCache<K, V>, Accountable {
|
|||
return cache.put(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public V remove(K key) {
|
||||
return cache.remove(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
|
||||
return cache.computeIfAbsent(key, mappingFunction);
|
||||
}
|
||||
|
||||
@Override
|
||||
public V get(K key) {
|
||||
return cache.get(key);
|
||||
|
|
|
@ -24,7 +24,9 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.lucene.util.Accountable;
|
||||
import org.apache.lucene.util.Accountables;
|
||||
|
@ -258,6 +260,66 @@ public class LRUCache<K,V> extends SolrCacheBase implements SolrCache<K,V>, Acco
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
|
||||
synchronized (map) {
|
||||
if (getState() == State.LIVE) {
|
||||
lookups++;
|
||||
stats.lookups.increment();
|
||||
}
|
||||
AtomicBoolean newEntry = new AtomicBoolean();
|
||||
CacheValue<V> entry = map.computeIfAbsent(key, k -> {
|
||||
V value = mappingFunction.apply(k);
|
||||
// preserve the semantics of computeIfAbsent
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
CacheValue<V> cacheValue = new CacheValue<>(value, timeSource.getEpochTimeNs());
|
||||
if (getState() == State.LIVE) {
|
||||
stats.inserts.increment();
|
||||
}
|
||||
if (syntheticEntries) {
|
||||
if (cacheValue.createTime < oldestEntry) {
|
||||
oldestEntry = cacheValue.createTime;
|
||||
}
|
||||
}
|
||||
// increment local inserts regardless of state???
|
||||
// it does make it more consistent with the current size...
|
||||
inserts++;
|
||||
|
||||
// important to calc and add new ram bytes first so that removeEldestEntry can compare correctly
|
||||
long keySize = RamUsageEstimator.sizeOfObject(key, QUERY_DEFAULT_RAM_BYTES_USED);
|
||||
long valueSize = RamUsageEstimator.sizeOfObject(cacheValue, QUERY_DEFAULT_RAM_BYTES_USED);
|
||||
ramBytesUsed += keySize + valueSize + LINKED_HASHTABLE_RAM_BYTES_PER_ENTRY;
|
||||
newEntry.set(true);
|
||||
return cacheValue;
|
||||
});
|
||||
if (!newEntry.get()) {
|
||||
if (getState() == State.LIVE) {
|
||||
hits++;
|
||||
stats.hits.increment();
|
||||
}
|
||||
}
|
||||
return entry != null ? entry.value : null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public V remove(K key) {
|
||||
synchronized (map) {
|
||||
CacheValue<V> entry = map.remove(key);
|
||||
if (entry != null) {
|
||||
long delta = RamUsageEstimator.sizeOfObject(key, QUERY_DEFAULT_RAM_BYTES_USED)
|
||||
+ RamUsageEstimator.sizeOfObject(entry, QUERY_DEFAULT_RAM_BYTES_USED)
|
||||
+ LINKED_HASHTABLE_RAM_BYTES_PER_ENTRY;
|
||||
ramBytesUsed -= delta;
|
||||
return entry.value;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public V put(K key, V value) {
|
||||
if (maxSize == Integer.MAX_VALUE && maxRamBytes == Long.MAX_VALUE) {
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.solr.search;
|
|||
import org.apache.solr.core.SolrInfoBean;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -93,6 +94,19 @@ public interface SolrCache<K,V> extends SolrInfoBean {
|
|||
/** :TODO: copy from Map */
|
||||
public V get(K key);
|
||||
|
||||
public V remove(K key);
|
||||
|
||||
/**
|
||||
* Get an existing element or atomically compute it if missing.
|
||||
* @param key key
|
||||
* @param mappingFunction function to compute the element. If the function returns a null
|
||||
* result the cache mapping will not be created. NOTE: this function
|
||||
* must NOT attempt to modify any mappings in the cache.
|
||||
* @return existing or newly computed value, null if there was no existing value and
|
||||
* it was not possible to compute a new value (in which case the new mapping won't be created).
|
||||
*/
|
||||
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction);
|
||||
|
||||
/** :TODO: copy from Map */
|
||||
public void clear();
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.solr.search;
|
|||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.solr.metrics.SolrMetricsContext;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -42,7 +43,15 @@ public class SolrCacheHolder<K, V> implements SolrCache<K,V> {
|
|||
|
||||
public V put(K key, V value) {
|
||||
return delegate.put(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public V remove(K key) {
|
||||
return delegate.remove(key);
|
||||
}
|
||||
|
||||
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
|
||||
return delegate.computeIfAbsent(key, mappingFunction);
|
||||
}
|
||||
|
||||
public V get(K key) {
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
@ -218,27 +219,35 @@ public class SolrDocumentFetcher {
|
|||
public Document doc(int i, Set<String> fields) throws IOException {
|
||||
Document d;
|
||||
if (documentCache != null) {
|
||||
d = documentCache.get(i);
|
||||
if (d != null) return d;
|
||||
final Set<String> getFields = enableLazyFieldLoading ? fields : null;
|
||||
AtomicReference<IOException> exceptionRef = new AtomicReference<>();
|
||||
d = documentCache.computeIfAbsent(i, docId -> {
|
||||
try {
|
||||
return docNC(docId, getFields);
|
||||
} catch (IOException e) {
|
||||
exceptionRef.set(e);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
if (exceptionRef.get() != null) {
|
||||
throw exceptionRef.get();
|
||||
}
|
||||
if (d == null) {
|
||||
// failed to retrieve due to an earlier exception, try again?
|
||||
return docNC(i, fields);
|
||||
} else {
|
||||
return d;
|
||||
}
|
||||
} else {
|
||||
return docNC(i, fields);
|
||||
}
|
||||
}
|
||||
|
||||
private Document docNC(int i, Set<String> fields) throws IOException {
|
||||
final DirectoryReader reader = searcher.getIndexReader();
|
||||
if (documentCache != null && !enableLazyFieldLoading) {
|
||||
// we do not filter the fields in this case because that would return an incomplete document which would
|
||||
// be eventually cached. The alternative would be to read the stored fields twice; once with the fields
|
||||
// and then without for caching leading to a performance hit
|
||||
// see SOLR-8858 for related discussion
|
||||
fields = null;
|
||||
}
|
||||
final SolrDocumentStoredFieldVisitor visitor = new SolrDocumentStoredFieldVisitor(fields, reader, i);
|
||||
reader.document(i, visitor);
|
||||
d = visitor.getDocument();
|
||||
|
||||
if (documentCache != null) {
|
||||
documentCache.put(i, d);
|
||||
}
|
||||
|
||||
return d;
|
||||
return visitor.getDocument();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -334,13 +343,12 @@ public class SolrDocumentFetcher {
|
|||
/** @see SolrIndexSearcher#doc(int, StoredFieldVisitor) */
|
||||
public void doc(int docId, StoredFieldVisitor visitor) throws IOException {
|
||||
if (documentCache != null) {
|
||||
Document cached = documentCache.get(docId);
|
||||
if (cached != null) {
|
||||
visitFromCached(cached, visitor);
|
||||
return;
|
||||
}
|
||||
// get cached document or retrieve it including all fields (and cache it)
|
||||
Document cached = doc(docId);
|
||||
visitFromCached(cached, visitor);
|
||||
} else {
|
||||
searcher.getIndexReader().document(docId, visitor);
|
||||
}
|
||||
searcher.getIndexReader().document(docId, visitor);
|
||||
}
|
||||
|
||||
/** Executes a stored field visitor against a hit from the document cache */
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.LinkedHashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.lucene.index.LeafReader;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
|
@ -564,66 +565,84 @@ public class UnInvertedField extends DocTermOrds {
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static UnInvertedField getUnInvertedField(String field, SolrIndexSearcher searcher) throws IOException {
|
||||
SolrCache cache = searcher.getFieldValueCache();
|
||||
SolrCache<String, UnInvertedField> cache = searcher.getFieldValueCache();
|
||||
if (cache == null) {
|
||||
return new UnInvertedField(field, searcher);
|
||||
}
|
||||
|
||||
Boolean doWait = false;
|
||||
synchronized (cache) {
|
||||
final Object val = cache.get(field);
|
||||
if (val == null || (val instanceof Throwable)) {
|
||||
/**
|
||||
* We use this place holder object to pull the UninvertedField construction out of the sync
|
||||
* so that if many fields are accessed in a short time, the UninvertedField can be
|
||||
* built for these fields in parallel rather than sequentially.
|
||||
*/
|
||||
cache.put(field, uifPlaceholder);
|
||||
} else {
|
||||
if (val != uifPlaceholder) {
|
||||
return (UnInvertedField) val;
|
||||
}
|
||||
doWait = true; // Someone else has put the place holder in, wait for that to complete.
|
||||
}
|
||||
}
|
||||
while (doWait) {
|
||||
AtomicReference<Throwable> throwableRef = new AtomicReference<>();
|
||||
UnInvertedField uif = cache.computeIfAbsent(field, f -> {
|
||||
UnInvertedField newUif;
|
||||
try {
|
||||
synchronized (cache) {
|
||||
final Object val = cache.get(field);
|
||||
if (val != uifPlaceholder) { // OK, another thread put this in the cache we should be good.
|
||||
if (val instanceof Throwable) {
|
||||
rethrowAsSolrException(field, (Throwable) val);
|
||||
} else {
|
||||
return (UnInvertedField) val;
|
||||
}
|
||||
}
|
||||
cache.wait();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
rethrowAsSolrException(field, e);
|
||||
newUif = new UnInvertedField(field, searcher);
|
||||
} catch (Throwable t) {
|
||||
throwableRef.set(t);
|
||||
newUif = null;
|
||||
}
|
||||
}
|
||||
|
||||
UnInvertedField uif = null;
|
||||
try {
|
||||
uif = new UnInvertedField(field, searcher);
|
||||
}catch(Throwable e) {
|
||||
synchronized (cache) {
|
||||
cache.put(field, e); // signaling the failure
|
||||
cache.notifyAll();
|
||||
}
|
||||
rethrowAsSolrException(field, e);
|
||||
}
|
||||
synchronized (cache) {
|
||||
cache.put(field, uif); // Note, this cleverly replaces the placeholder.
|
||||
cache.notifyAll();
|
||||
return newUif;
|
||||
});
|
||||
if (throwableRef.get() != null) {
|
||||
rethrowAsSolrException(field, throwableRef.get());
|
||||
}
|
||||
return uif;
|
||||
|
||||
// (ab) if my understanding is correct this whole block tried to mimic the
|
||||
// semantics of computeIfAbsent
|
||||
|
||||
// Boolean doWait = false;
|
||||
// synchronized (cache) {
|
||||
// final Object val = cache.get(field);
|
||||
// if (val == null || (val instanceof Throwable)) {
|
||||
// /**
|
||||
// * We use this place holder object to pull the UninvertedField construction out of the sync
|
||||
// * so that if many fields are accessed in a short time, the UninvertedField can be
|
||||
// * built for these fields in parallel rather than sequentially.
|
||||
// */
|
||||
// cache.put(field, uifPlaceholder);
|
||||
// } else {
|
||||
// if (val != uifPlaceholder) {
|
||||
// return (UnInvertedField) val;
|
||||
// }
|
||||
// doWait = true; // Someone else has put the place holder in, wait for that to complete.
|
||||
// }
|
||||
// }
|
||||
// while (doWait) {
|
||||
// try {
|
||||
// synchronized (cache) {
|
||||
// final Object val = cache.get(field);
|
||||
// if (val != uifPlaceholder) { // OK, another thread put this in the cache we should be good.
|
||||
// if (val instanceof Throwable) {
|
||||
// rethrowAsSolrException(field, (Throwable) val);
|
||||
// } else {
|
||||
// return (UnInvertedField) val;
|
||||
// }
|
||||
// }
|
||||
// cache.wait();
|
||||
// }
|
||||
// } catch (InterruptedException e) {
|
||||
// rethrowAsSolrException(field, e);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// UnInvertedField uif = null;
|
||||
// try {
|
||||
// uif = new UnInvertedField(field, searcher);
|
||||
// }catch(Throwable e) {
|
||||
// synchronized (cache) {
|
||||
// cache.put(field, e); // signaling the failure
|
||||
// cache.notifyAll();
|
||||
// }
|
||||
// rethrowAsSolrException(field, e);
|
||||
// }
|
||||
// synchronized (cache) {
|
||||
// cache.put(field, uif); // Note, this cleverly replaces the placeholder.
|
||||
// cache.notifyAll();
|
||||
// }
|
||||
// return uif;
|
||||
}
|
||||
|
||||
protected static void rethrowAsSolrException(String field, Throwable e) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR,
|
||||
"Exception occurs during uninverting "+field, e);
|
||||
"Exception occured during uninverting " + field, e);
|
||||
}
|
||||
|
||||
// Returns null if not already populated
|
||||
|
|
|
@ -85,20 +85,21 @@ public class BlockJoinParentQParser extends FiltersQParser {
|
|||
}
|
||||
|
||||
public static BitDocIdSetFilterWrapper getCachedFilter(final SolrQueryRequest request, Query parentList) {
|
||||
SolrCache parentCache = request.getSearcher().getCache(CACHE_NAME);
|
||||
SolrCache<Query, Filter> parentCache = request.getSearcher().getCache(CACHE_NAME);
|
||||
// lazily retrieve from solr cache
|
||||
Filter filter = null;
|
||||
if (parentCache != null) {
|
||||
filter = (Filter) parentCache.get(parentList);
|
||||
}
|
||||
BitDocIdSetFilterWrapper result;
|
||||
if (filter instanceof BitDocIdSetFilterWrapper) {
|
||||
result = (BitDocIdSetFilterWrapper) filter;
|
||||
} else {
|
||||
result = new BitDocIdSetFilterWrapper(createParentFilter(parentList));
|
||||
if (parentCache != null) {
|
||||
if (parentCache != null) {
|
||||
Filter filter = parentCache.computeIfAbsent(parentList,
|
||||
query -> new BitDocIdSetFilterWrapper(createParentFilter(query)));
|
||||
if (filter instanceof BitDocIdSetFilterWrapper) {
|
||||
result = (BitDocIdSetFilterWrapper) filter;
|
||||
} else {
|
||||
result = new BitDocIdSetFilterWrapper(createParentFilter(parentList));
|
||||
// non-atomic update of existing entry to ensure strong-typing
|
||||
parentCache.put(parentList, result);
|
||||
}
|
||||
} else {
|
||||
result = new BitDocIdSetFilterWrapper(createParentFilter(parentList));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -24,9 +24,11 @@ import java.util.TreeSet;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
//import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.lucene.util.Accountable;
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
|
@ -165,6 +167,46 @@ public class ConcurrentLFUCache<K, V> implements Cache<K,V>, Accountable {
|
|||
return e != null ? e.value : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
|
||||
// prescreen access first
|
||||
V val = get(key);
|
||||
if (val != null) {
|
||||
return val;
|
||||
}
|
||||
AtomicBoolean newValue = new AtomicBoolean();
|
||||
if (islive) {
|
||||
stats.accessCounter.increment();
|
||||
}
|
||||
CacheEntry<K, V> entry = map.computeIfAbsent(key, k -> {
|
||||
V value = mappingFunction.apply(key);
|
||||
// preserve the semantics of computeIfAbsent
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
CacheEntry<K, V> e = new CacheEntry<>(key, value, timeSource.getEpochTimeNs());
|
||||
newValue.set(true);
|
||||
oldestEntry.updateAndGet(x -> x > e.lastAccessed || x == 0 ? e.lastAccessed : x);
|
||||
stats.size.increment();
|
||||
ramBytes.add(e.ramBytesUsed() + HASHTABLE_RAM_BYTES_PER_ENTRY); // added key + value + entry
|
||||
if (islive) {
|
||||
stats.putCounter.increment();
|
||||
} else {
|
||||
stats.nonLivePutCounter.increment();
|
||||
}
|
||||
return e;
|
||||
});
|
||||
if (newValue.get()) {
|
||||
maybeMarkAndSweep();
|
||||
} else {
|
||||
if (islive && entry != null) {
|
||||
entry.lastAccessed = timeSource.getEpochTimeNs();
|
||||
entry.hits.increment();
|
||||
}
|
||||
}
|
||||
return entry != null ? entry.value : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V remove(K key) {
|
||||
CacheEntry<K, V> cacheEntry = map.remove(key);
|
||||
|
@ -183,6 +225,8 @@ public class ConcurrentLFUCache<K, V> implements Cache<K,V>, Accountable {
|
|||
return putCacheEntry(e);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Visible for testing to create synthetic cache entries.
|
||||
* @lucene.internal
|
||||
|
@ -192,13 +236,10 @@ public class ConcurrentLFUCache<K, V> implements Cache<K,V>, Accountable {
|
|||
// initialize oldestEntry
|
||||
oldestEntry.updateAndGet(x -> x > e.lastAccessed || x == 0 ? e.lastAccessed : x);
|
||||
CacheEntry<K, V> oldCacheEntry = map.put(e.key, e);
|
||||
int currentSize;
|
||||
if (oldCacheEntry == null) {
|
||||
stats.size.increment();
|
||||
currentSize = stats.size.intValue();
|
||||
ramBytes.add(e.ramBytesUsed() + HASHTABLE_RAM_BYTES_PER_ENTRY); // added key + value + entry
|
||||
} else {
|
||||
currentSize = stats.size.intValue();
|
||||
ramBytes.add(-oldCacheEntry.ramBytesUsed());
|
||||
ramBytes.add(e.ramBytesUsed());
|
||||
}
|
||||
|
@ -207,7 +248,11 @@ public class ConcurrentLFUCache<K, V> implements Cache<K,V>, Accountable {
|
|||
} else {
|
||||
stats.nonLivePutCounter.increment();
|
||||
}
|
||||
maybeMarkAndSweep();
|
||||
return oldCacheEntry == null ? null : oldCacheEntry.value;
|
||||
}
|
||||
|
||||
private void maybeMarkAndSweep() {
|
||||
// Check if we need to clear out old entries from the cache.
|
||||
// isCleaning variable is checked instead of markAndSweepLock.isLocked()
|
||||
// for performance because every put invokation will check until
|
||||
|
@ -219,6 +264,7 @@ public class ConcurrentLFUCache<K, V> implements Cache<K,V>, Accountable {
|
|||
// Thread safety note: isCleaning read is piggybacked (comes after) other volatile reads
|
||||
// in this method.
|
||||
boolean evictByIdleTime = maxIdleTimeNs != Long.MAX_VALUE;
|
||||
int currentSize = stats.size.intValue();
|
||||
long idleCutoff = evictByIdleTime ? timeSource.getEpochTimeNs() - maxIdleTimeNs : -1L;
|
||||
if ((currentSize > upperWaterMark || (evictByIdleTime && oldestEntry.get() < idleCutoff)) && !isCleaning) {
|
||||
if (newThreadForCleanup) {
|
||||
|
@ -229,7 +275,6 @@ public class ConcurrentLFUCache<K, V> implements Cache<K,V>, Accountable {
|
|||
markAndSweep();
|
||||
}
|
||||
}
|
||||
return oldCacheEntry == null ? null : oldCacheEntry.value;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -33,10 +33,12 @@ import java.util.TreeSet;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
//import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.apache.lucene.util.RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
|
||||
import static org.apache.lucene.util.RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED;
|
||||
|
@ -189,7 +191,7 @@ public class ConcurrentLRUCache<K,V> implements Cache<K,V>, Accountable {
|
|||
|
||||
@Override
|
||||
public V get(K key) {
|
||||
CacheEntry<K,V> e = map.get(key);
|
||||
CacheEntry<K, V> e = map.get(key);
|
||||
if (e == null) {
|
||||
if (islive) stats.missCounter.increment();
|
||||
return null;
|
||||
|
@ -200,7 +202,7 @@ public class ConcurrentLRUCache<K,V> implements Cache<K,V>, Accountable {
|
|||
|
||||
@Override
|
||||
public V remove(K key) {
|
||||
CacheEntry<K,V> cacheEntry = map.remove(key);
|
||||
CacheEntry<K, V> cacheEntry = map.remove(key);
|
||||
if (cacheEntry != null) {
|
||||
stats.size.decrement();
|
||||
ramBytes.add(-cacheEntry.ramBytesUsed() - HASHTABLE_RAM_BYTES_PER_ENTRY);
|
||||
|
@ -209,6 +211,42 @@ public class ConcurrentLRUCache<K,V> implements Cache<K,V>, Accountable {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
|
||||
// prescreen access first
|
||||
V val = get(key);
|
||||
if (val != null) {
|
||||
return val;
|
||||
}
|
||||
AtomicBoolean newEntry = new AtomicBoolean();
|
||||
CacheEntry<K, V> entry = map.computeIfAbsent(key, k -> {
|
||||
V value = mappingFunction.apply(key);
|
||||
// preserve the semantics of computeIfAbsent
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
CacheEntry<K, V> e = new CacheEntry<>(key, value, timeSource.getEpochTimeNs(), stats.accessCounter.incrementAndGet());
|
||||
oldestEntryNs.updateAndGet(x -> x > e.createTime || x == 0 ? e.createTime : x);
|
||||
stats.size.increment();
|
||||
ramBytes.add(e.ramBytesUsed() + HASHTABLE_RAM_BYTES_PER_ENTRY); // added key + value + entry
|
||||
if (islive) {
|
||||
stats.putCounter.increment();
|
||||
} else {
|
||||
stats.nonLivePutCounter.increment();
|
||||
}
|
||||
newEntry.set(true);
|
||||
return e;
|
||||
});
|
||||
if (newEntry.get()) {
|
||||
maybeMarkAndSweep();
|
||||
} else {
|
||||
if (islive && entry != null) {
|
||||
entry.lastAccessed = stats.accessCounter.incrementAndGet();
|
||||
}
|
||||
}
|
||||
return entry != null ? entry.value : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V put(K key, V val) {
|
||||
if (val == null) return null;
|
||||
|
@ -224,13 +262,10 @@ public class ConcurrentLRUCache<K,V> implements Cache<K,V>, Accountable {
|
|||
// initialize oldestEntryNs
|
||||
oldestEntryNs.updateAndGet(x -> x > e.createTime || x == 0 ? e.createTime : x);
|
||||
CacheEntry<K,V> oldCacheEntry = map.put(e.key, e);
|
||||
int currentSize;
|
||||
if (oldCacheEntry == null) {
|
||||
stats.size.increment();
|
||||
currentSize = stats.size.intValue();
|
||||
ramBytes.add(e.ramBytesUsed() + HASHTABLE_RAM_BYTES_PER_ENTRY); // added key + value + entry
|
||||
} else {
|
||||
currentSize = stats.size.intValue();
|
||||
ramBytes.add(-oldCacheEntry.ramBytesUsed());
|
||||
ramBytes.add(e.ramBytesUsed());
|
||||
}
|
||||
|
@ -239,7 +274,11 @@ public class ConcurrentLRUCache<K,V> implements Cache<K,V>, Accountable {
|
|||
} else {
|
||||
stats.nonLivePutCounter.increment();
|
||||
}
|
||||
maybeMarkAndSweep();
|
||||
return oldCacheEntry == null ? null : oldCacheEntry.value;
|
||||
}
|
||||
|
||||
private void maybeMarkAndSweep() {
|
||||
// Check if we need to clear out old entries from the cache.
|
||||
// isCleaning variable is checked instead of markAndSweepLock.isLocked()
|
||||
// for performance because every put invocation will check until
|
||||
|
@ -251,6 +290,7 @@ public class ConcurrentLRUCache<K,V> implements Cache<K,V>, Accountable {
|
|||
// Thread safety note: isCleaning read is piggybacked (comes after) other volatile reads
|
||||
// in this method.
|
||||
long idleCutoff = timeSource.getEpochTimeNs() - maxIdleTimeNs;
|
||||
int currentSize = stats.size.intValue();
|
||||
if ((currentSize > upperWaterMark || ramBytes.sum() > ramUpperWatermark || oldestEntryNs.get() < idleCutoff) && !isCleaning) {
|
||||
if (newThreadForCleanup) {
|
||||
new Thread(this::markAndSweep).start();
|
||||
|
@ -260,7 +300,6 @@ public class ConcurrentLRUCache<K,V> implements Cache<K,V>, Accountable {
|
|||
markAndSweep();
|
||||
}
|
||||
}
|
||||
return oldCacheEntry == null ? null : oldCacheEntry.value;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,143 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.search;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.metrics.SolrMetricManager;
|
||||
import org.apache.solr.metrics.SolrMetricsContext;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@LuceneTestCase.Slow
|
||||
public class TestSolrCachePerf extends SolrTestCaseJ4 {
|
||||
|
||||
private static final Class<? extends SolrCache>[] IMPLS = new Class[] {
|
||||
CaffeineCache.class,
|
||||
LRUCache.class,
|
||||
LFUCache.class,
|
||||
FastLRUCache.class
|
||||
};
|
||||
|
||||
private final int NUM_KEYS = 5000;
|
||||
private final String[] keys = new String[NUM_KEYS];
|
||||
|
||||
@Before
|
||||
public void setupKeys() {
|
||||
for (int i = 0; i < NUM_KEYS; i++) {
|
||||
keys[i] = String.valueOf(random().nextInt(100));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetPutCompute() throws Exception {
|
||||
Map<String, SummaryStatistics> getPutRatio = new HashMap<>();
|
||||
Map<String, SummaryStatistics> computeRatio = new HashMap<>();
|
||||
Map<String, SummaryStatistics> getPutTime = new HashMap<>();
|
||||
Map<String, SummaryStatistics> computeTime = new HashMap<>();
|
||||
// warm-up
|
||||
int threads = 10;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
doTestGetPutCompute(new HashMap<String, SummaryStatistics>(), new HashMap<String, SummaryStatistics>(), threads, false);
|
||||
doTestGetPutCompute(new HashMap<String, SummaryStatistics>(), new HashMap<String, SummaryStatistics>(), threads, true);
|
||||
}
|
||||
for (int i = 0; i < 100; i++) {
|
||||
doTestGetPutCompute(getPutRatio, getPutTime, threads, false);
|
||||
doTestGetPutCompute(computeRatio, computeTime, threads, true);
|
||||
}
|
||||
computeRatio.forEach((type, computeStats) -> {
|
||||
SummaryStatistics getPutStats = getPutRatio.get(type);
|
||||
assertTrue("compute ratio (" + computeStats.getMean() + ") should be higher or equal from get/put ("
|
||||
+ getPutStats.getMean() + ")", computeStats.getMean() >= getPutStats.getMean());
|
||||
});
|
||||
}
|
||||
|
||||
static final String VALUE = "foo";
|
||||
|
||||
private void doTestGetPutCompute(Map<String, SummaryStatistics> ratioStats, Map<String, SummaryStatistics> timeStats, int numThreads, boolean useCompute) throws Exception {
|
||||
for (Class<? extends SolrCache> clazz : IMPLS) {
|
||||
SolrMetricManager metricManager = new SolrMetricManager();
|
||||
SolrCache<String, String> cache = clazz.getDeclaredConstructor().newInstance();
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("size", "" + NUM_KEYS);
|
||||
CacheRegenerator cr = new NoOpRegenerator();
|
||||
Object o = cache.init(params, null, cr);
|
||||
cache.setState(SolrCache.State.LIVE);
|
||||
cache.initializeMetrics(new SolrMetricsContext(metricManager, "foo", "bar"), "foo");
|
||||
AtomicBoolean stop = new AtomicBoolean();
|
||||
SummaryStatistics perImplRatio = ratioStats.computeIfAbsent(clazz.getSimpleName(), c -> new SummaryStatistics());
|
||||
SummaryStatistics perImplTime = timeStats.computeIfAbsent(clazz.getSimpleName(), c -> new SummaryStatistics());
|
||||
CountDownLatch startLatch = new CountDownLatch(1);
|
||||
CountDownLatch stopLatch = new CountDownLatch(numThreads * NUM_KEYS);
|
||||
List<Thread> runners = new ArrayList<>();
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
Thread t = new Thread(() -> {
|
||||
try {
|
||||
startLatch.await();
|
||||
int ik = 0;
|
||||
while (!stop.get()) {
|
||||
String key = keys[ik % NUM_KEYS];
|
||||
ik++;
|
||||
if (useCompute) {
|
||||
String value = cache.computeIfAbsent(key, k -> VALUE);
|
||||
assertNotNull(value);
|
||||
} else {
|
||||
String value = cache.get(key);
|
||||
if (value == null) {
|
||||
// increase a likelihood of context switch
|
||||
Thread.yield();
|
||||
cache.put(key, VALUE);
|
||||
}
|
||||
}
|
||||
Thread.yield();
|
||||
stopLatch.countDown();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
fail(e.toString());
|
||||
return;
|
||||
}
|
||||
});
|
||||
t.start();
|
||||
runners.add(t);
|
||||
}
|
||||
// fire them up
|
||||
long startTime = System.nanoTime();
|
||||
startLatch.countDown();
|
||||
stopLatch.await();
|
||||
stop.set(true);
|
||||
for (Thread t : runners) {
|
||||
t.join();
|
||||
}
|
||||
long stopTime = System.nanoTime();
|
||||
Map<String, Object> metrics = cache.getSolrMetricsContext().getMetricsSnapshot();
|
||||
perImplRatio.addValue(
|
||||
Double.parseDouble(String.valueOf(metrics.get("CACHE.foo.hitratio"))));
|
||||
perImplTime.addValue((double)(stopTime - startTime));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue