diff --git a/core/src/main/java/org/jclouds/rest/suppliers/MemoizedRetryOnTimeOutButNotOnAuthorizationExceptionSupplier.java b/core/src/main/java/org/jclouds/rest/suppliers/MemoizedRetryOnTimeOutButNotOnAuthorizationExceptionSupplier.java index 3c7de37deb..6fa91dfac7 100644 --- a/core/src/main/java/org/jclouds/rest/suppliers/MemoizedRetryOnTimeOutButNotOnAuthorizationExceptionSupplier.java +++ b/core/src/main/java/org/jclouds/rest/suppliers/MemoizedRetryOnTimeOutButNotOnAuthorizationExceptionSupplier.java @@ -18,7 +18,7 @@ */ package org.jclouds.rest.suppliers; -import static com.google.common.base.Suppliers.memoizeWithExpiration; +import static org.jclouds.util.Suppliers2.memoizeWithExpirationOnAbsoluteInterval; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -49,7 +49,7 @@ public class MemoizedRetryOnTimeOutButNotOnAuthorizationExceptionSupplier imp public MemoizedRetryOnTimeOutButNotOnAuthorizationExceptionSupplier( AtomicReference authException, long seconds, Supplier delegate) { - this.delegate = memoizeWithExpiration(new RetryOnTimeOutExceptionSupplier( + this.delegate = memoizeWithExpirationOnAbsoluteInterval(new RetryOnTimeOutExceptionSupplier( new SetAndThrowAuthorizationExceptionSupplier(delegate, authException)), seconds, TimeUnit.SECONDS); this.seconds = seconds; } diff --git a/core/src/main/java/org/jclouds/util/Suppliers2.java b/core/src/main/java/org/jclouds/util/Suppliers2.java index 0df90ab3fd..bbfa6a187e 100644 --- a/core/src/main/java/org/jclouds/util/Suppliers2.java +++ b/core/src/main/java/org/jclouds/util/Suppliers2.java @@ -22,7 +22,12 @@ import static com.google.common.base.Preconditions.checkNotNull; import java.io.IOException; import java.io.OutputStream; +import java.io.Serializable; +import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.common.io.OutputSupplier; /** @@ -44,5 +49,63 @@ public class Suppliers2 { }; } - + /** + * See Supplier.memoizeWithExpiration. + * + * Difference between this impl and v11.0 is that we fix http://code.google.com/p/guava-libraries/issues/detail?id=857. + */ + public static Supplier memoizeWithExpirationOnAbsoluteInterval(Supplier delegate, long duration, TimeUnit unit) { + return new ExpiringMemoizingSupplier(delegate, duration, unit); + } + + @VisibleForTesting + static class ExpiringMemoizingSupplier implements Supplier, Serializable { + final Supplier delegate; + final long durationNanos; + transient volatile T value; + // The special value 0 means "not yet initialized". + transient volatile long expirationNanos; + + ExpiringMemoizingSupplier(Supplier delegate, long duration, TimeUnit unit) { + this.delegate = Preconditions.checkNotNull(delegate); + this.durationNanos = unit.toNanos(duration); + Preconditions.checkArgument(duration > 0); + } + + @Override + public T get() { + // Another variant of Double Checked Locking. + // + // We use two volatile reads. We could reduce this to one by + // putting our fields into a holder class, but (at least on x86) + // the extra memory consumption and indirection are more + // expensive than the extra volatile reads. + long nanos = expirationNanos; + long now = System.nanoTime(); + if (nanos == 0 || now - nanos >= 0) { + synchronized (this) { + if (nanos == expirationNanos) { // recheck for lost race + + // Set value to null prior to retrieving new val, so old and new are not held in memory simultaneously + value = null; + + T t = delegate.get(); + value = t; + + // Update now so that, if call was expensive, we keep value for the full duration + now = System.nanoTime(); + + nanos = now + durationNanos; + // In the very unlikely event that nanos is 0, set it to 1; + // no one will notice 1 ns of tardiness. + expirationNanos = (nanos == 0) ? 1 : nanos; + return t; + } + } + } + return value; + } + + private static final long serialVersionUID = 0; + } } diff --git a/core/src/test/java/org/jclouds/util/Suppliers2Test.java b/core/src/test/java/org/jclouds/util/Suppliers2Test.java new file mode 100644 index 0000000000..1583dcd27c --- /dev/null +++ b/core/src/test/java/org/jclouds/util/Suppliers2Test.java @@ -0,0 +1,221 @@ +package org.jclouds.util; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertSame; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.testng.annotations.Test; + +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; + +public class Suppliers2Test { + + @Test + public void testMemoizeKeepsValueForFullDurationWhenDelegateCallIsSlow() { + final long SLEEP_TIME = 250; + final long EXPIRATION_TIME = 200; + + Supplier slowSupplier = new CountingSupplier() { + private static final long serialVersionUID = 1L; + + @Override public Integer get() { + try { + Thread.sleep(SLEEP_TIME); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return super.get(); + } + }; + + Supplier memoizedSupplier = Suppliers2.memoizeWithExpirationOnAbsoluteInterval( + slowSupplier, EXPIRATION_TIME, TimeUnit.MILLISECONDS); + + assertEquals(memoizedSupplier.get(), (Integer)10); + assertEquals(memoizedSupplier.get(), (Integer)10); + } + + // ================================= + // + // TODO Everything below this point is taken from SuppliersTest, to test our version of the Suppliers2.memoizeWithExpiration + // It should be deleted when we can switch back to using the google Supplier.memoizeWithExpiration. + + private static class CountingSupplier implements Supplier, Serializable { + private static final long serialVersionUID = 0L; + transient int calls = 0; + @Override + public Integer get() { + calls++; + return calls * 10; + } + } + + @Test + public void testMemoizeWithExpiration() throws InterruptedException { + CountingSupplier countingSupplier = new CountingSupplier(); + + Supplier memoizedSupplier = Suppliers2.memoizeWithExpirationOnAbsoluteInterval( + countingSupplier, 75, TimeUnit.MILLISECONDS); + + checkExpiration(countingSupplier, memoizedSupplier); + } + + @Test + public void testMemoizeWithExpirationSerialized() + throws InterruptedException { + CountingSupplier countingSupplier = new CountingSupplier(); + + Supplier memoizedSupplier = Suppliers2.memoizeWithExpirationOnAbsoluteInterval( + countingSupplier, 75, TimeUnit.MILLISECONDS); + // Calls to the original memoized supplier shouldn't affect its copy. + memoizedSupplier.get(); + + Supplier copy = reserialize(memoizedSupplier); + memoizedSupplier.get(); + + CountingSupplier countingCopy = (CountingSupplier) + ((Suppliers2.ExpiringMemoizingSupplier) copy).delegate; + checkExpiration(countingCopy, copy); + } + + private void checkExpiration( + CountingSupplier countingSupplier, Supplier memoizedSupplier) + throws InterruptedException { + // the underlying supplier hasn't executed yet + assertEquals(0, countingSupplier.calls); + + assertEquals(10, (int) memoizedSupplier.get()); + // now it has + assertEquals(1, countingSupplier.calls); + + assertEquals(10, (int) memoizedSupplier.get()); + // it still should only have executed once due to memoization + assertEquals(1, countingSupplier.calls); + + Thread.sleep(150); + + assertEquals(20, (int) memoizedSupplier.get()); + // old value expired + assertEquals(2, countingSupplier.calls); + + assertEquals(20, (int) memoizedSupplier.get()); + // it still should only have executed twice due to memoization + assertEquals(2, countingSupplier.calls); + } + + @Test + public void testExpiringMemoizedSupplierThreadSafe() throws Throwable { + Function, Supplier> memoizer = + new Function, Supplier>() { + @Override public Supplier apply(Supplier supplier) { + return Suppliers2.memoizeWithExpirationOnAbsoluteInterval( + supplier, Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } + }; + testSupplierThreadSafe(memoizer); + } + + public void testSupplierThreadSafe( + Function, Supplier> memoizer) + throws Throwable { + final AtomicInteger count = new AtomicInteger(0); + final AtomicReference thrown = + new AtomicReference(null); + final int numThreads = 3; + final Thread[] threads = new Thread[numThreads]; + final long timeout = TimeUnit.SECONDS.toNanos(60); + + final Supplier supplier = new Supplier() { + boolean isWaiting(Thread thread) { + switch (thread.getState()) { + case BLOCKED: + case WAITING: + case TIMED_WAITING: + return true; + default: + return false; + } + } + + int waitingThreads() { + int waitingThreads = 0; + for (Thread thread : threads) { + if (isWaiting(thread)) { + waitingThreads++; + } + } + return waitingThreads; + } + + @Override + public Boolean get() { + // Check that this method is called exactly once, by the first + // thread to synchronize. + long t0 = System.nanoTime(); + while (waitingThreads() != numThreads - 1) { + if (System.nanoTime() - t0 > timeout) { + thrown.set(new TimeoutException( + "timed out waiting for other threads to block" + + " synchronizing on supplier")); + break; + } + Thread.yield(); + } + count.getAndIncrement(); + return Boolean.TRUE; + } + }; + + final Supplier memoizedSupplier = memoizer.apply(supplier); + + for (int i = 0; i < numThreads; i++) { + threads[i] = new Thread() { + @Override public void run() { + assertSame(Boolean.TRUE, memoizedSupplier.get()); + } + }; + } + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + } + + if (thrown.get() != null) { + throw thrown.get(); + } + assertEquals(1, count.get()); + } + + // Taken from com.google.common.testing.SerializableTester + @SuppressWarnings("unchecked") + private static T reserialize(T object) { + checkNotNull(object); + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try { + ObjectOutputStream out = new ObjectOutputStream(bytes); + out.writeObject(object); + ObjectInputStream in = new ObjectInputStream( + new ByteArrayInputStream(bytes.toByteArray())); + return (T) in.readObject(); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } +} diff --git a/resources/NOTICE.txt b/resources/NOTICE.txt index eef2bb325c..54b9419d75 100644 --- a/resources/NOTICE.txt +++ b/resources/NOTICE.txt @@ -53,6 +53,10 @@ distributed under the Apache Software License, Version 2.0. This product includes Guava (http://code.google.com/p/guava-libraries) distributed under the Apache Software License, Version 2.0. +org.jclouds.util.Suppliers2 and Suppliers2Test contain pieces of Guava's Suppliers +distributed under the Apache Software License, Version 2.0. +(c) 2010 Google Inc. + This product includes BouncyCastle (http://www.bouncycastle.org/licence.html) distributed with the following notice: "Copyright (c) 2000 - 2011 The Legion Of The Bouncy Castle (http://www.bouncycastle.org)