HHH-15305 Update custom LIRS implementation based on BoundedConcurrentHashMap
This commit is contained in:
parent
2143ced49e
commit
7e5c3c8dde
|
@ -34,6 +34,7 @@ import java.util.NoSuchElementException;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -330,6 +331,8 @@ public class BoundedConcurrentHashMap<K, V> extends AbstractMap<K, V>
|
||||||
static final class LRU<K, V> extends LinkedHashMap<HashEntry<K, V>, V> implements EvictionPolicy<K, V> {
|
static final class LRU<K, V> extends LinkedHashMap<HashEntry<K, V>, V> implements EvictionPolicy<K, V> {
|
||||||
|
|
||||||
private final ConcurrentLinkedQueue<HashEntry<K, V>> accessQueue;
|
private final ConcurrentLinkedQueue<HashEntry<K, V>> accessQueue;
|
||||||
|
|
||||||
|
private final AtomicLong accessQueueSize;
|
||||||
private final Segment<K, V> segment;
|
private final Segment<K, V> segment;
|
||||||
private final int maxBatchQueueSize;
|
private final int maxBatchQueueSize;
|
||||||
private final int trimDownSize;
|
private final int trimDownSize;
|
||||||
|
@ -344,19 +347,29 @@ public class BoundedConcurrentHashMap<K, V> extends AbstractMap<K, V>
|
||||||
this.batchThresholdFactor = batchThresholdFactor;
|
this.batchThresholdFactor = batchThresholdFactor;
|
||||||
this.accessQueue = new ConcurrentLinkedQueue<>();
|
this.accessQueue = new ConcurrentLinkedQueue<>();
|
||||||
this.evicted = new HashSet<>();
|
this.evicted = new HashSet<>();
|
||||||
|
this.accessQueueSize = new AtomicLong();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute() {
|
public void execute() {
|
||||||
for ( HashEntry<K, V> e : accessQueue ) {
|
assert segment.isHeldByCurrentThread();
|
||||||
put( e, e.value );
|
long removed = 0;
|
||||||
|
HashEntry<K, V> e;
|
||||||
|
try {
|
||||||
|
while ((e = accessQueue.poll()) != null) {
|
||||||
|
removed++;
|
||||||
|
put(e, e.value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
// guarantee that under OOM size won't be broken
|
||||||
|
accessQueueSize.addAndGet(-removed);
|
||||||
}
|
}
|
||||||
accessQueue.clear();
|
|
||||||
evicted.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onEntryMiss(HashEntry<K, V> e) {
|
public void onEntryMiss(HashEntry<K, V> e) {
|
||||||
|
assert segment.isHeldByCurrentThread();
|
||||||
put( e, e.value );
|
put( e, e.value );
|
||||||
if ( !evicted.isEmpty() ) {
|
if ( !evicted.isEmpty() ) {
|
||||||
evicted.clear();
|
evicted.clear();
|
||||||
|
@ -369,7 +382,11 @@ public class BoundedConcurrentHashMap<K, V> extends AbstractMap<K, V>
|
||||||
@Override
|
@Override
|
||||||
public boolean onEntryHit(HashEntry<K, V> e) {
|
public boolean onEntryHit(HashEntry<K, V> e) {
|
||||||
accessQueue.add( e );
|
accessQueue.add( e );
|
||||||
return accessQueue.size() >= maxBatchQueueSize * batchThresholdFactor;
|
// counter-intuitive:
|
||||||
|
// Why not placing this *before* appending the entry to the access queue?
|
||||||
|
// we don't want the eviction to kick-in if the access queue doesn't contain enough entries.
|
||||||
|
final long size = accessQueueSize.incrementAndGet();
|
||||||
|
return size >= maxBatchQueueSize * batchThresholdFactor;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -377,22 +394,30 @@ public class BoundedConcurrentHashMap<K, V> extends AbstractMap<K, V>
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean thresholdExpired() {
|
public boolean thresholdExpired() {
|
||||||
return accessQueue.size() >= maxBatchQueueSize;
|
return accessQueueSize.get() >= maxBatchQueueSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onEntryRemove(HashEntry<K, V> e) {
|
public void onEntryRemove(HashEntry<K, V> e) {
|
||||||
|
assert segment.isHeldByCurrentThread();
|
||||||
remove( e );
|
remove( e );
|
||||||
// we could have multiple instances of e in accessQueue; remove them all
|
// we could have multiple instances of e in accessQueue; remove them all
|
||||||
|
long removed = 0;
|
||||||
while ( accessQueue.remove( e ) ) {
|
while ( accessQueue.remove( e ) ) {
|
||||||
continue;
|
removed--;
|
||||||
}
|
}
|
||||||
|
accessQueueSize.addAndGet(-removed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void clear() {
|
public void clear() {
|
||||||
|
assert segment.isHeldByCurrentThread();
|
||||||
super.clear();
|
super.clear();
|
||||||
accessQueue.clear();
|
long removed = 0;
|
||||||
|
while (accessQueue.poll() != null) {
|
||||||
|
removed++;
|
||||||
|
}
|
||||||
|
accessQueueSize.addAndGet(-removed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -405,6 +430,7 @@ public class BoundedConcurrentHashMap<K, V> extends AbstractMap<K, V>
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean removeEldestEntry(Map.Entry<HashEntry<K, V>, V> eldest) {
|
protected boolean removeEldestEntry(Map.Entry<HashEntry<K, V>, V> eldest) {
|
||||||
|
assert segment.isHeldByCurrentThread();
|
||||||
boolean aboveThreshold = isAboveThreshold();
|
boolean aboveThreshold = isAboveThreshold();
|
||||||
if ( aboveThreshold ) {
|
if ( aboveThreshold ) {
|
||||||
HashEntry<K, V> evictedEntry = eldest.getKey();
|
HashEntry<K, V> evictedEntry = eldest.getKey();
|
||||||
|
@ -839,6 +865,8 @@ public class BoundedConcurrentHashMap<K, V> extends AbstractMap<K, V>
|
||||||
*/
|
*/
|
||||||
private final ConcurrentLinkedQueue<LIRSHashEntry<K, V>> accessQueue;
|
private final ConcurrentLinkedQueue<LIRSHashEntry<K, V>> accessQueue;
|
||||||
|
|
||||||
|
private final AtomicLong accessQueueSize;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The maxBatchQueueSize
|
* The maxBatchQueueSize
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -898,6 +926,7 @@ public class BoundedConcurrentHashMap<K, V> extends AbstractMap<K, V>
|
||||||
this.maxBatchQueueSize = maxBatchSize > MAX_BATCH_SIZE ? MAX_BATCH_SIZE : maxBatchSize;
|
this.maxBatchQueueSize = maxBatchSize > MAX_BATCH_SIZE ? MAX_BATCH_SIZE : maxBatchSize;
|
||||||
this.batchThresholdFactor = batchThresholdFactor;
|
this.batchThresholdFactor = batchThresholdFactor;
|
||||||
this.accessQueue = new ConcurrentLinkedQueue<LIRSHashEntry<K, V>>();
|
this.accessQueue = new ConcurrentLinkedQueue<LIRSHashEntry<K, V>>();
|
||||||
|
this.accessQueueSize = new AtomicLong();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int calculateLIRSize(int maximumSize) {
|
private static int calculateLIRSize(int maximumSize) {
|
||||||
|
@ -907,9 +936,13 @@ public class BoundedConcurrentHashMap<K, V> extends AbstractMap<K, V>
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute() {
|
public void execute() {
|
||||||
|
assert segment.isHeldByCurrentThread();
|
||||||
Set<HashEntry<K, V>> evicted = new HashSet<>();
|
Set<HashEntry<K, V>> evicted = new HashSet<>();
|
||||||
|
long removed = 0;
|
||||||
try {
|
try {
|
||||||
for ( LIRSHashEntry<K, V> e : accessQueue ) {
|
LIRSHashEntry<K, V> e;
|
||||||
|
while ( (e = accessQueue.poll()) != null ) {
|
||||||
|
removed++;
|
||||||
if ( e.isResident() ) {
|
if ( e.isResident() ) {
|
||||||
e.hit( evicted );
|
e.hit( evicted );
|
||||||
}
|
}
|
||||||
|
@ -917,7 +950,8 @@ public class BoundedConcurrentHashMap<K, V> extends AbstractMap<K, V>
|
||||||
removeFromSegment( evicted );
|
removeFromSegment( evicted );
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
accessQueue.clear();
|
// guarantee that under OOM size won't be broken
|
||||||
|
accessQueueSize.addAndGet(-removed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -968,7 +1002,11 @@ public class BoundedConcurrentHashMap<K, V> extends AbstractMap<K, V>
|
||||||
@Override
|
@Override
|
||||||
public boolean onEntryHit(HashEntry<K, V> e) {
|
public boolean onEntryHit(HashEntry<K, V> e) {
|
||||||
accessQueue.add( (LIRSHashEntry<K, V>) e );
|
accessQueue.add( (LIRSHashEntry<K, V>) e );
|
||||||
return accessQueue.size() >= maxBatchQueueSize * batchThresholdFactor;
|
// counter-intuitive:
|
||||||
|
// Why not placing this *before* appending the entry to the access queue?
|
||||||
|
// we don't want the eviction to kick-in if the access queue doesn't contain enough entries.
|
||||||
|
final long size = accessQueueSize.incrementAndGet();
|
||||||
|
return size >= maxBatchQueueSize * batchThresholdFactor;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -976,21 +1014,29 @@ public class BoundedConcurrentHashMap<K, V> extends AbstractMap<K, V>
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean thresholdExpired() {
|
public boolean thresholdExpired() {
|
||||||
return accessQueue.size() >= maxBatchQueueSize;
|
return accessQueueSize.get() >= maxBatchQueueSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onEntryRemove(HashEntry<K, V> e) {
|
public void onEntryRemove(HashEntry<K, V> e) {
|
||||||
|
assert segment.isHeldByCurrentThread();
|
||||||
( (LIRSHashEntry<K, V>) e ).remove();
|
( (LIRSHashEntry<K, V>) e ).remove();
|
||||||
|
long removed = 0;
|
||||||
// we could have multiple instances of e in accessQueue; remove them all
|
// we could have multiple instances of e in accessQueue; remove them all
|
||||||
while ( accessQueue.remove( e ) ) {
|
while ( accessQueue.remove( e ) ) {
|
||||||
|
removed++;
|
||||||
}
|
}
|
||||||
|
accessQueueSize.addAndGet(-removed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void clear() {
|
public void clear() {
|
||||||
accessQueue.clear();
|
assert segment.isHeldByCurrentThread();
|
||||||
|
long removed = 0;
|
||||||
|
while (accessQueue.poll() != null) {
|
||||||
|
removed++;
|
||||||
|
}
|
||||||
|
accessQueueSize.addAndGet(-removed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue