Issue 764: added Suppliers2.memoizeWithExpirationOnAbsoluteInterval

This commit is contained in:
Aled Sage 2012-01-09 18:03:30 +00:00
parent 56aa4e5ece
commit 557c5db706
4 changed files with 291 additions and 3 deletions

View File

@ -18,7 +18,7 @@
*/
package org.jclouds.rest.suppliers;
import static com.google.common.base.Suppliers.memoizeWithExpiration;
import static org.jclouds.util.Suppliers2.memoizeWithExpirationOnAbsoluteInterval;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -49,7 +49,7 @@ public class MemoizedRetryOnTimeOutButNotOnAuthorizationExceptionSupplier<T> imp
public MemoizedRetryOnTimeOutButNotOnAuthorizationExceptionSupplier(
AtomicReference<AuthorizationException> authException, long seconds, Supplier<T> delegate) {
this.delegate = memoizeWithExpiration(new RetryOnTimeOutExceptionSupplier<T>(
this.delegate = memoizeWithExpirationOnAbsoluteInterval(new RetryOnTimeOutExceptionSupplier<T>(
new SetAndThrowAuthorizationExceptionSupplier<T>(delegate, authException)), seconds, TimeUnit.SECONDS);
this.seconds = seconds;
}

View File

@ -22,7 +22,12 @@ import static com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.io.OutputSupplier;
/**
@ -44,5 +49,63 @@ public class Suppliers2 {
};
}
/**
* See Supplier.memoizeWithExpiration.
*
* Difference between this impl and v11.0 is that we fix http://code.google.com/p/guava-libraries/issues/detail?id=857.
*/
public static <T> Supplier<T> memoizeWithExpirationOnAbsoluteInterval(Supplier<T> delegate, long duration, TimeUnit unit) {
return new ExpiringMemoizingSupplier<T>(delegate, duration, unit);
}
@VisibleForTesting
static class ExpiringMemoizingSupplier<T> implements Supplier<T>, Serializable {
final Supplier<T> delegate;
final long durationNanos;
transient volatile T value;
// The special value 0 means "not yet initialized".
transient volatile long expirationNanos;
ExpiringMemoizingSupplier(Supplier<T> delegate, long duration, TimeUnit unit) {
this.delegate = Preconditions.checkNotNull(delegate);
this.durationNanos = unit.toNanos(duration);
Preconditions.checkArgument(duration > 0);
}
@Override
public T get() {
// Another variant of Double Checked Locking.
//
// We use two volatile reads. We could reduce this to one by
// putting our fields into a holder class, but (at least on x86)
// the extra memory consumption and indirection are more
// expensive than the extra volatile reads.
long nanos = expirationNanos;
long now = System.nanoTime();
if (nanos == 0 || now - nanos >= 0) {
synchronized (this) {
if (nanos == expirationNanos) { // recheck for lost race
// Set value to null prior to retrieving new val, so old and new are not held in memory simultaneously
value = null;
T t = delegate.get();
value = t;
// Update now so that, if call was expensive, we keep value for the full duration
now = System.nanoTime();
nanos = now + durationNanos;
// In the very unlikely event that nanos is 0, set it to 1;
// no one will notice 1 ns of tardiness.
expirationNanos = (nanos == 0) ? 1 : nanos;
return t;
}
}
}
return value;
}
private static final long serialVersionUID = 0;
}
}

View File

@ -0,0 +1,221 @@
package org.jclouds.util;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertSame;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.testng.annotations.Test;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
public class Suppliers2Test {
@Test
public void testMemoizeKeepsValueForFullDurationWhenDelegateCallIsSlow() {
final long SLEEP_TIME = 250;
final long EXPIRATION_TIME = 200;
Supplier<Integer> slowSupplier = new CountingSupplier() {
private static final long serialVersionUID = 1L;
@Override public Integer get() {
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return super.get();
}
};
Supplier<Integer> memoizedSupplier = Suppliers2.memoizeWithExpirationOnAbsoluteInterval(
slowSupplier, EXPIRATION_TIME, TimeUnit.MILLISECONDS);
assertEquals(memoizedSupplier.get(), (Integer)10);
assertEquals(memoizedSupplier.get(), (Integer)10);
}
// =================================
//
// TODO Everything below this point is taken from SuppliersTest, to test our version of the Suppliers2.memoizeWithExpiration
// It should be deleted when we can switch back to using the google Supplier.memoizeWithExpiration.
private static class CountingSupplier implements Supplier<Integer>, Serializable {
private static final long serialVersionUID = 0L;
transient int calls = 0;
@Override
public Integer get() {
calls++;
return calls * 10;
}
}
@Test
public void testMemoizeWithExpiration() throws InterruptedException {
CountingSupplier countingSupplier = new CountingSupplier();
Supplier<Integer> memoizedSupplier = Suppliers2.memoizeWithExpirationOnAbsoluteInterval(
countingSupplier, 75, TimeUnit.MILLISECONDS);
checkExpiration(countingSupplier, memoizedSupplier);
}
@Test
public void testMemoizeWithExpirationSerialized()
throws InterruptedException {
CountingSupplier countingSupplier = new CountingSupplier();
Supplier<Integer> memoizedSupplier = Suppliers2.memoizeWithExpirationOnAbsoluteInterval(
countingSupplier, 75, TimeUnit.MILLISECONDS);
// Calls to the original memoized supplier shouldn't affect its copy.
memoizedSupplier.get();
Supplier<Integer> copy = reserialize(memoizedSupplier);
memoizedSupplier.get();
CountingSupplier countingCopy = (CountingSupplier)
((Suppliers2.ExpiringMemoizingSupplier<Integer>) copy).delegate;
checkExpiration(countingCopy, copy);
}
private void checkExpiration(
CountingSupplier countingSupplier, Supplier<Integer> memoizedSupplier)
throws InterruptedException {
// the underlying supplier hasn't executed yet
assertEquals(0, countingSupplier.calls);
assertEquals(10, (int) memoizedSupplier.get());
// now it has
assertEquals(1, countingSupplier.calls);
assertEquals(10, (int) memoizedSupplier.get());
// it still should only have executed once due to memoization
assertEquals(1, countingSupplier.calls);
Thread.sleep(150);
assertEquals(20, (int) memoizedSupplier.get());
// old value expired
assertEquals(2, countingSupplier.calls);
assertEquals(20, (int) memoizedSupplier.get());
// it still should only have executed twice due to memoization
assertEquals(2, countingSupplier.calls);
}
@Test
public void testExpiringMemoizedSupplierThreadSafe() throws Throwable {
Function<Supplier<Boolean>, Supplier<Boolean>> memoizer =
new Function<Supplier<Boolean>, Supplier<Boolean>>() {
@Override public Supplier<Boolean> apply(Supplier<Boolean> supplier) {
return Suppliers2.memoizeWithExpirationOnAbsoluteInterval(
supplier, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}
};
testSupplierThreadSafe(memoizer);
}
public void testSupplierThreadSafe(
Function<Supplier<Boolean>, Supplier<Boolean>> memoizer)
throws Throwable {
final AtomicInteger count = new AtomicInteger(0);
final AtomicReference<Throwable> thrown =
new AtomicReference<Throwable>(null);
final int numThreads = 3;
final Thread[] threads = new Thread[numThreads];
final long timeout = TimeUnit.SECONDS.toNanos(60);
final Supplier<Boolean> supplier = new Supplier<Boolean>() {
boolean isWaiting(Thread thread) {
switch (thread.getState()) {
case BLOCKED:
case WAITING:
case TIMED_WAITING:
return true;
default:
return false;
}
}
int waitingThreads() {
int waitingThreads = 0;
for (Thread thread : threads) {
if (isWaiting(thread)) {
waitingThreads++;
}
}
return waitingThreads;
}
@Override
public Boolean get() {
// Check that this method is called exactly once, by the first
// thread to synchronize.
long t0 = System.nanoTime();
while (waitingThreads() != numThreads - 1) {
if (System.nanoTime() - t0 > timeout) {
thrown.set(new TimeoutException(
"timed out waiting for other threads to block" +
" synchronizing on supplier"));
break;
}
Thread.yield();
}
count.getAndIncrement();
return Boolean.TRUE;
}
};
final Supplier<Boolean> memoizedSupplier = memoizer.apply(supplier);
for (int i = 0; i < numThreads; i++) {
threads[i] = new Thread() {
@Override public void run() {
assertSame(Boolean.TRUE, memoizedSupplier.get());
}
};
}
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
if (thrown.get() != null) {
throw thrown.get();
}
assertEquals(1, count.get());
}
// Taken from com.google.common.testing.SerializableTester
@SuppressWarnings("unchecked")
private static <T> T reserialize(T object) {
checkNotNull(object);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
ObjectOutputStream out = new ObjectOutputStream(bytes);
out.writeObject(object);
ObjectInputStream in = new ObjectInputStream(
new ByteArrayInputStream(bytes.toByteArray()));
return (T) in.readObject();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -53,6 +53,10 @@ distributed under the Apache Software License, Version 2.0.
This product includes Guava (http://code.google.com/p/guava-libraries)
distributed under the Apache Software License, Version 2.0.
org.jclouds.util.Suppliers2 and Suppliers2Test contain pieces of Guava's Suppliers
distributed under the Apache Software License, Version 2.0.
(c) 2010 Google Inc.
This product includes BouncyCastle (http://www.bouncycastle.org/licence.html)
distributed with the following notice:
"Copyright (c) 2000 - 2011 The Legion Of The Bouncy Castle (http://www.bouncycastle.org)