Add support for expiration after write to Cache
This commit adds supports for expiration after writes to Cache. This enables entries to expire after they were initially placed in the cache without prolonging their life on retrieval. Replacements are considered new writes.
This commit is contained in:
parent
5d340f5e6e
commit
64727b78de
|
@ -60,7 +60,10 @@ import java.util.function.ToLongBiFunction;
|
||||||
*/
|
*/
|
||||||
public class Cache<K, V> {
|
public class Cache<K, V> {
|
||||||
// positive if entries have an expiration
|
// positive if entries have an expiration
|
||||||
private long expireAfter = -1;
|
private long expireAfterAccess = -1;
|
||||||
|
|
||||||
|
// positive if entries have an expiration after write
|
||||||
|
private long expireAfterWrite = -1;
|
||||||
|
|
||||||
// the number of entries in the cache
|
// the number of entries in the cache
|
||||||
private int count = 0;
|
private int count = 0;
|
||||||
|
@ -82,11 +85,11 @@ public class Cache<K, V> {
|
||||||
Cache() {
|
Cache() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void setExpireAfter(long expireAfter) {
|
void setExpireAfterAccess(long expireAfterAccess) {
|
||||||
if (expireAfter <= 0) {
|
if (expireAfterAccess <= 0) {
|
||||||
throw new IllegalArgumentException("expireAfter <= 0");
|
throw new IllegalArgumentException("expireAfterAccess <= 0");
|
||||||
}
|
}
|
||||||
this.expireAfter = expireAfter;
|
this.expireAfterAccess = expireAfterAccess;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setMaximumWeight(long maximumWeight) {
|
void setMaximumWeight(long maximumWeight) {
|
||||||
|
@ -112,7 +115,11 @@ public class Cache<K, V> {
|
||||||
*/
|
*/
|
||||||
protected long now() {
|
protected long now() {
|
||||||
// System.nanoTime takes non-negligible time, so we only use it if we need it
|
// System.nanoTime takes non-negligible time, so we only use it if we need it
|
||||||
return expireAfter == -1 ? 0 : System.nanoTime();
|
return expireAfterAccess == -1 ? 0 : System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setExpireAfterWrite(long expireAfterWrite) {
|
||||||
|
this.expireAfterWrite = expireAfterWrite;
|
||||||
}
|
}
|
||||||
|
|
||||||
// the state of an entry in the LRU list
|
// the state of an entry in the LRU list
|
||||||
|
@ -121,15 +128,16 @@ public class Cache<K, V> {
|
||||||
static class Entry<K, V> {
|
static class Entry<K, V> {
|
||||||
final K key;
|
final K key;
|
||||||
final V value;
|
final V value;
|
||||||
|
long writeTime;
|
||||||
long accessTime;
|
long accessTime;
|
||||||
Entry<K, V> before;
|
Entry<K, V> before;
|
||||||
Entry<K, V> after;
|
Entry<K, V> after;
|
||||||
State state = State.NEW;
|
State state = State.NEW;
|
||||||
|
|
||||||
public Entry(K key, V value, long accessTime) {
|
public Entry(K key, V value, long writeTime) {
|
||||||
this.key = key;
|
this.key = key;
|
||||||
this.value = value;
|
this.value = value;
|
||||||
this.accessTime = accessTime;
|
this.writeTime = this.accessTime = writeTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -566,7 +574,8 @@ public class Cache<K, V> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isExpired(Entry<K, V> entry, long now) {
|
private boolean isExpired(Entry<K, V> entry, long now) {
|
||||||
return expireAfter != -1 && now - entry.accessTime > expireAfter;
|
return (expireAfterAccess != -1 && now - entry.accessTime > expireAfterAccess) ||
|
||||||
|
(expireAfterWrite != -1 && now - entry.writeTime > expireAfterWrite);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean unlink(Entry<K, V> entry) {
|
private boolean unlink(Entry<K, V> entry) {
|
||||||
|
|
|
@ -24,7 +24,8 @@ import java.util.function.ToLongBiFunction;
|
||||||
|
|
||||||
public class CacheBuilder<K, V> {
|
public class CacheBuilder<K, V> {
|
||||||
private long maximumWeight = -1;
|
private long maximumWeight = -1;
|
||||||
private long expireAfter = -1;
|
private long expireAfterAccess = -1;
|
||||||
|
private long expireAfterWrite = -1;
|
||||||
private ToLongBiFunction<K, V> weigher;
|
private ToLongBiFunction<K, V> weigher;
|
||||||
private RemovalListener<K, V> removalListener;
|
private RemovalListener<K, V> removalListener;
|
||||||
|
|
||||||
|
@ -43,11 +44,19 @@ public class CacheBuilder<K, V> {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public CacheBuilder<K, V> setExpireAfter(long expireAfter) {
|
public CacheBuilder<K, V> setExpireAfterAccess(long expireAfterAccess) {
|
||||||
if (expireAfter <= 0) {
|
if (expireAfterAccess <= 0) {
|
||||||
throw new IllegalArgumentException("expireAfter <= 0");
|
throw new IllegalArgumentException("expireAfterAccess <= 0");
|
||||||
}
|
}
|
||||||
this.expireAfter = expireAfter;
|
this.expireAfterAccess = expireAfterAccess;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CacheBuilder<K, V> setExpireAfterWrite(long expireAfterWrite) {
|
||||||
|
if (expireAfterWrite <= 0) {
|
||||||
|
throw new IllegalArgumentException("expireAfterWrite <= 0");
|
||||||
|
}
|
||||||
|
this.expireAfterWrite = expireAfterWrite;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,8 +77,11 @@ public class CacheBuilder<K, V> {
|
||||||
if (maximumWeight != -1) {
|
if (maximumWeight != -1) {
|
||||||
cache.setMaximumWeight(maximumWeight);
|
cache.setMaximumWeight(maximumWeight);
|
||||||
}
|
}
|
||||||
if (expireAfter != -1) {
|
if (expireAfterAccess != -1) {
|
||||||
cache.setExpireAfter(expireAfter);
|
cache.setExpireAfterAccess(expireAfterAccess);
|
||||||
|
}
|
||||||
|
if (expireAfterWrite != -1) {
|
||||||
|
cache.setExpireAfterWrite(expireAfterWrite);
|
||||||
}
|
}
|
||||||
if (weigher != null) {
|
if (weigher != null) {
|
||||||
cache.setWeigher(weigher);
|
cache.setWeigher(weigher);
|
||||||
|
|
|
@ -162,7 +162,7 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
|
||||||
// cacheBuilder.concurrencyLevel(concurrencyLevel);
|
// cacheBuilder.concurrencyLevel(concurrencyLevel);
|
||||||
|
|
||||||
if (expire != null) {
|
if (expire != null) {
|
||||||
cacheBuilder.setExpireAfter(TimeUnit.MILLISECONDS.toNanos(expire.millis()));
|
cacheBuilder.setExpireAfterAccess(TimeUnit.MILLISECONDS.toNanos(expire.millis()));
|
||||||
}
|
}
|
||||||
|
|
||||||
cache = cacheBuilder.build();
|
cache = cacheBuilder.build();
|
||||||
|
|
|
@ -156,7 +156,7 @@ public class ScriptService extends AbstractComponent implements Closeable {
|
||||||
cacheBuilder.setMaximumWeight(cacheMaxSize);
|
cacheBuilder.setMaximumWeight(cacheMaxSize);
|
||||||
}
|
}
|
||||||
if (cacheExpire != null) {
|
if (cacheExpire != null) {
|
||||||
cacheBuilder.setExpireAfter(cacheExpire.nanos());
|
cacheBuilder.setExpireAfterAccess(cacheExpire.nanos());
|
||||||
}
|
}
|
||||||
this.cache = cacheBuilder.removalListener(new ScriptCacheRemovalListener()).build();
|
this.cache = cacheBuilder.removalListener(new ScriptCacheRemovalListener()).build();
|
||||||
|
|
||||||
|
|
|
@ -23,11 +23,9 @@ import org.elasticsearch.test.ESTestCase;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ConcurrentSkipListSet;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
|
||||||
import static org.hamcrest.Matchers.not;
|
import static org.hamcrest.Matchers.not;
|
||||||
|
|
||||||
public class CacheTests extends ESTestCase {
|
public class CacheTests extends ESTestCase {
|
||||||
|
@ -179,7 +177,7 @@ public class CacheTests extends ESTestCase {
|
||||||
|
|
||||||
// cache some entries, step the clock forward, cache some more entries, step the clock forward and then check that
|
// cache some entries, step the clock forward, cache some more entries, step the clock forward and then check that
|
||||||
// the first batch of cached entries expired and were removed
|
// the first batch of cached entries expired and were removed
|
||||||
public void testExpiration() {
|
public void testExpirationAfterAccess() {
|
||||||
AtomicLong now = new AtomicLong();
|
AtomicLong now = new AtomicLong();
|
||||||
Cache<Integer, String> cache = new Cache<Integer, String>() {
|
Cache<Integer, String> cache = new Cache<Integer, String>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -187,7 +185,7 @@ public class CacheTests extends ESTestCase {
|
||||||
return now.get();
|
return now.get();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
cache.setExpireAfter(1);
|
cache.setExpireAfterAccess(1);
|
||||||
List<Integer> evictedKeys = new ArrayList<>();
|
List<Integer> evictedKeys = new ArrayList<>();
|
||||||
cache.setRemovalListener(notification -> {
|
cache.setRemovalListener(notification -> {
|
||||||
assertEquals(RemovalNotification.RemovalReason.EVICTED, notification.getRemovalReason());
|
assertEquals(RemovalNotification.RemovalReason.EVICTED, notification.getRemovalReason());
|
||||||
|
@ -216,6 +214,46 @@ public class CacheTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testExpirationAfterWrite() {
|
||||||
|
AtomicLong now = new AtomicLong();
|
||||||
|
Cache<Integer, String> cache = new Cache<Integer, String>() {
|
||||||
|
@Override
|
||||||
|
protected long now() {
|
||||||
|
return now.get();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
cache.setExpireAfterWrite(1);
|
||||||
|
List<Integer> evictedKeys = new ArrayList<>();
|
||||||
|
cache.setRemovalListener(notification -> {
|
||||||
|
assertEquals(RemovalNotification.RemovalReason.EVICTED, notification.getRemovalReason());
|
||||||
|
evictedKeys.add(notification.getKey());
|
||||||
|
});
|
||||||
|
now.set(0);
|
||||||
|
for (int i = 0; i < numberOfEntries; i++) {
|
||||||
|
cache.put(i, Integer.toString(i));
|
||||||
|
}
|
||||||
|
now.set(1);
|
||||||
|
for (int i = numberOfEntries; i < 2 * numberOfEntries; i++) {
|
||||||
|
cache.put(i, Integer.toString(i));
|
||||||
|
}
|
||||||
|
now.set(2);
|
||||||
|
for (int i = 0; i < numberOfEntries; i++) {
|
||||||
|
cache.get(i);
|
||||||
|
}
|
||||||
|
cache.refresh();
|
||||||
|
assertEquals(numberOfEntries, cache.count());
|
||||||
|
for (int i = 0; i < evictedKeys.size(); i++) {
|
||||||
|
assertEquals(i, (int) evictedKeys.get(i));
|
||||||
|
}
|
||||||
|
Set<Integer> remainingKeys = new HashSet<>();
|
||||||
|
for (Integer key : cache.keys()) {
|
||||||
|
remainingKeys.add(key);
|
||||||
|
}
|
||||||
|
for (int i = numberOfEntries; i < 2 * numberOfEntries; i++) {
|
||||||
|
assertTrue(remainingKeys.contains(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// randomly promote some entries, step the clock forward, then check that the promoted entries remain and the
|
// randomly promote some entries, step the clock forward, then check that the promoted entries remain and the
|
||||||
// non-promoted entries were removed
|
// non-promoted entries were removed
|
||||||
public void testPromotion() {
|
public void testPromotion() {
|
||||||
|
@ -226,7 +264,7 @@ public class CacheTests extends ESTestCase {
|
||||||
return now.get();
|
return now.get();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
cache.setExpireAfter(1);
|
cache.setExpireAfterAccess(1);
|
||||||
now.set(0);
|
now.set(0);
|
||||||
for (int i = 0; i < numberOfEntries; i++) {
|
for (int i = 0; i < numberOfEntries; i++) {
|
||||||
cache.put(i, Integer.toString(i));
|
cache.put(i, Integer.toString(i));
|
||||||
|
|
Loading…
Reference in New Issue