mirror of https://github.com/apache/jclouds.git
Merge branch 'master' of github.com:jclouds/jclouds
* 'master' of github.com:jclouds/jclouds: Issue 764: added Suppliers2.memoizeWithExpirationOnAbsoluteInterval
This commit is contained in:
commit
9e6960b4a7
|
@ -18,7 +18,7 @@
|
||||||
*/
|
*/
|
||||||
package org.jclouds.rest.suppliers;
|
package org.jclouds.rest.suppliers;
|
||||||
|
|
||||||
import static com.google.common.base.Suppliers.memoizeWithExpiration;
|
import static org.jclouds.util.Suppliers2.memoizeWithExpirationOnAbsoluteInterval;
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
@ -49,7 +49,7 @@ public class MemoizedRetryOnTimeOutButNotOnAuthorizationExceptionSupplier<T> imp
|
||||||
|
|
||||||
public MemoizedRetryOnTimeOutButNotOnAuthorizationExceptionSupplier(
|
public MemoizedRetryOnTimeOutButNotOnAuthorizationExceptionSupplier(
|
||||||
AtomicReference<AuthorizationException> authException, long seconds, Supplier<T> delegate) {
|
AtomicReference<AuthorizationException> authException, long seconds, Supplier<T> delegate) {
|
||||||
this.delegate = memoizeWithExpiration(new RetryOnTimeOutExceptionSupplier<T>(
|
this.delegate = memoizeWithExpirationOnAbsoluteInterval(new RetryOnTimeOutExceptionSupplier<T>(
|
||||||
new SetAndThrowAuthorizationExceptionSupplier<T>(delegate, authException)), seconds, TimeUnit.SECONDS);
|
new SetAndThrowAuthorizationExceptionSupplier<T>(delegate, authException)), seconds, TimeUnit.SECONDS);
|
||||||
this.seconds = seconds;
|
this.seconds = seconds;
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,12 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.io.OutputSupplier;
|
import com.google.common.io.OutputSupplier;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -44,5 +49,63 @@ public class Suppliers2 {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* See Supplier.memoizeWithExpiration.
|
||||||
|
*
|
||||||
|
* Difference between this impl and v11.0 is that we fix http://code.google.com/p/guava-libraries/issues/detail?id=857.
|
||||||
|
*/
|
||||||
|
public static <T> Supplier<T> memoizeWithExpirationOnAbsoluteInterval(Supplier<T> delegate, long duration, TimeUnit unit) {
|
||||||
|
return new ExpiringMemoizingSupplier<T>(delegate, duration, unit);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static class ExpiringMemoizingSupplier<T> implements Supplier<T>, Serializable {
|
||||||
|
final Supplier<T> delegate;
|
||||||
|
final long durationNanos;
|
||||||
|
transient volatile T value;
|
||||||
|
// The special value 0 means "not yet initialized".
|
||||||
|
transient volatile long expirationNanos;
|
||||||
|
|
||||||
|
ExpiringMemoizingSupplier(Supplier<T> delegate, long duration, TimeUnit unit) {
|
||||||
|
this.delegate = Preconditions.checkNotNull(delegate);
|
||||||
|
this.durationNanos = unit.toNanos(duration);
|
||||||
|
Preconditions.checkArgument(duration > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T get() {
|
||||||
|
// Another variant of Double Checked Locking.
|
||||||
|
//
|
||||||
|
// We use two volatile reads. We could reduce this to one by
|
||||||
|
// putting our fields into a holder class, but (at least on x86)
|
||||||
|
// the extra memory consumption and indirection are more
|
||||||
|
// expensive than the extra volatile reads.
|
||||||
|
long nanos = expirationNanos;
|
||||||
|
long now = System.nanoTime();
|
||||||
|
if (nanos == 0 || now - nanos >= 0) {
|
||||||
|
synchronized (this) {
|
||||||
|
if (nanos == expirationNanos) { // recheck for lost race
|
||||||
|
|
||||||
|
// Set value to null prior to retrieving new val, so old and new are not held in memory simultaneously
|
||||||
|
value = null;
|
||||||
|
|
||||||
|
T t = delegate.get();
|
||||||
|
value = t;
|
||||||
|
|
||||||
|
// Update now so that, if call was expensive, we keep value for the full duration
|
||||||
|
now = System.nanoTime();
|
||||||
|
|
||||||
|
nanos = now + durationNanos;
|
||||||
|
// In the very unlikely event that nanos is 0, set it to 1;
|
||||||
|
// no one will notice 1 ns of tardiness.
|
||||||
|
expirationNanos = (nanos == 0) ? 1 : nanos;
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,221 @@
|
||||||
|
package org.jclouds.util;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
import static org.testng.Assert.assertEquals;
|
||||||
|
import static org.testng.Assert.assertSame;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.ObjectInputStream;
|
||||||
|
import java.io.ObjectOutputStream;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
import com.google.common.base.Suppliers;
|
||||||
|
|
||||||
|
public class Suppliers2Test {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMemoizeKeepsValueForFullDurationWhenDelegateCallIsSlow() {
|
||||||
|
final long SLEEP_TIME = 250;
|
||||||
|
final long EXPIRATION_TIME = 200;
|
||||||
|
|
||||||
|
Supplier<Integer> slowSupplier = new CountingSupplier() {
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
@Override public Integer get() {
|
||||||
|
try {
|
||||||
|
Thread.sleep(SLEEP_TIME);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
return super.get();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Supplier<Integer> memoizedSupplier = Suppliers2.memoizeWithExpirationOnAbsoluteInterval(
|
||||||
|
slowSupplier, EXPIRATION_TIME, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
assertEquals(memoizedSupplier.get(), (Integer)10);
|
||||||
|
assertEquals(memoizedSupplier.get(), (Integer)10);
|
||||||
|
}
|
||||||
|
|
||||||
|
// =================================
|
||||||
|
//
|
||||||
|
// TODO Everything below this point is taken from SuppliersTest, to test our version of the Suppliers2.memoizeWithExpiration
|
||||||
|
// It should be deleted when we can switch back to using the google Supplier.memoizeWithExpiration.
|
||||||
|
|
||||||
|
private static class CountingSupplier implements Supplier<Integer>, Serializable {
|
||||||
|
private static final long serialVersionUID = 0L;
|
||||||
|
transient int calls = 0;
|
||||||
|
@Override
|
||||||
|
public Integer get() {
|
||||||
|
calls++;
|
||||||
|
return calls * 10;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMemoizeWithExpiration() throws InterruptedException {
|
||||||
|
CountingSupplier countingSupplier = new CountingSupplier();
|
||||||
|
|
||||||
|
Supplier<Integer> memoizedSupplier = Suppliers2.memoizeWithExpirationOnAbsoluteInterval(
|
||||||
|
countingSupplier, 75, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
checkExpiration(countingSupplier, memoizedSupplier);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMemoizeWithExpirationSerialized()
|
||||||
|
throws InterruptedException {
|
||||||
|
CountingSupplier countingSupplier = new CountingSupplier();
|
||||||
|
|
||||||
|
Supplier<Integer> memoizedSupplier = Suppliers2.memoizeWithExpirationOnAbsoluteInterval(
|
||||||
|
countingSupplier, 75, TimeUnit.MILLISECONDS);
|
||||||
|
// Calls to the original memoized supplier shouldn't affect its copy.
|
||||||
|
memoizedSupplier.get();
|
||||||
|
|
||||||
|
Supplier<Integer> copy = reserialize(memoizedSupplier);
|
||||||
|
memoizedSupplier.get();
|
||||||
|
|
||||||
|
CountingSupplier countingCopy = (CountingSupplier)
|
||||||
|
((Suppliers2.ExpiringMemoizingSupplier<Integer>) copy).delegate;
|
||||||
|
checkExpiration(countingCopy, copy);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkExpiration(
|
||||||
|
CountingSupplier countingSupplier, Supplier<Integer> memoizedSupplier)
|
||||||
|
throws InterruptedException {
|
||||||
|
// the underlying supplier hasn't executed yet
|
||||||
|
assertEquals(0, countingSupplier.calls);
|
||||||
|
|
||||||
|
assertEquals(10, (int) memoizedSupplier.get());
|
||||||
|
// now it has
|
||||||
|
assertEquals(1, countingSupplier.calls);
|
||||||
|
|
||||||
|
assertEquals(10, (int) memoizedSupplier.get());
|
||||||
|
// it still should only have executed once due to memoization
|
||||||
|
assertEquals(1, countingSupplier.calls);
|
||||||
|
|
||||||
|
Thread.sleep(150);
|
||||||
|
|
||||||
|
assertEquals(20, (int) memoizedSupplier.get());
|
||||||
|
// old value expired
|
||||||
|
assertEquals(2, countingSupplier.calls);
|
||||||
|
|
||||||
|
assertEquals(20, (int) memoizedSupplier.get());
|
||||||
|
// it still should only have executed twice due to memoization
|
||||||
|
assertEquals(2, countingSupplier.calls);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExpiringMemoizedSupplierThreadSafe() throws Throwable {
|
||||||
|
Function<Supplier<Boolean>, Supplier<Boolean>> memoizer =
|
||||||
|
new Function<Supplier<Boolean>, Supplier<Boolean>>() {
|
||||||
|
@Override public Supplier<Boolean> apply(Supplier<Boolean> supplier) {
|
||||||
|
return Suppliers2.memoizeWithExpirationOnAbsoluteInterval(
|
||||||
|
supplier, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
testSupplierThreadSafe(memoizer);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSupplierThreadSafe(
|
||||||
|
Function<Supplier<Boolean>, Supplier<Boolean>> memoizer)
|
||||||
|
throws Throwable {
|
||||||
|
final AtomicInteger count = new AtomicInteger(0);
|
||||||
|
final AtomicReference<Throwable> thrown =
|
||||||
|
new AtomicReference<Throwable>(null);
|
||||||
|
final int numThreads = 3;
|
||||||
|
final Thread[] threads = new Thread[numThreads];
|
||||||
|
final long timeout = TimeUnit.SECONDS.toNanos(60);
|
||||||
|
|
||||||
|
final Supplier<Boolean> supplier = new Supplier<Boolean>() {
|
||||||
|
boolean isWaiting(Thread thread) {
|
||||||
|
switch (thread.getState()) {
|
||||||
|
case BLOCKED:
|
||||||
|
case WAITING:
|
||||||
|
case TIMED_WAITING:
|
||||||
|
return true;
|
||||||
|
default:
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int waitingThreads() {
|
||||||
|
int waitingThreads = 0;
|
||||||
|
for (Thread thread : threads) {
|
||||||
|
if (isWaiting(thread)) {
|
||||||
|
waitingThreads++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return waitingThreads;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
// Check that this method is called exactly once, by the first
|
||||||
|
// thread to synchronize.
|
||||||
|
long t0 = System.nanoTime();
|
||||||
|
while (waitingThreads() != numThreads - 1) {
|
||||||
|
if (System.nanoTime() - t0 > timeout) {
|
||||||
|
thrown.set(new TimeoutException(
|
||||||
|
"timed out waiting for other threads to block" +
|
||||||
|
" synchronizing on supplier"));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Thread.yield();
|
||||||
|
}
|
||||||
|
count.getAndIncrement();
|
||||||
|
return Boolean.TRUE;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
final Supplier<Boolean> memoizedSupplier = memoizer.apply(supplier);
|
||||||
|
|
||||||
|
for (int i = 0; i < numThreads; i++) {
|
||||||
|
threads[i] = new Thread() {
|
||||||
|
@Override public void run() {
|
||||||
|
assertSame(Boolean.TRUE, memoizedSupplier.get());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
for (Thread t : threads) {
|
||||||
|
t.start();
|
||||||
|
}
|
||||||
|
for (Thread t : threads) {
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (thrown.get() != null) {
|
||||||
|
throw thrown.get();
|
||||||
|
}
|
||||||
|
assertEquals(1, count.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Taken from com.google.common.testing.SerializableTester
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private static <T> T reserialize(T object) {
|
||||||
|
checkNotNull(object);
|
||||||
|
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||||
|
try {
|
||||||
|
ObjectOutputStream out = new ObjectOutputStream(bytes);
|
||||||
|
out.writeObject(object);
|
||||||
|
ObjectInputStream in = new ObjectInputStream(
|
||||||
|
new ByteArrayInputStream(bytes.toByteArray()));
|
||||||
|
return (T) in.readObject();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -41,6 +41,10 @@ distributed under the Apache Software License, Version 2.0.
|
||||||
This product includes Guava (http://code.google.com/p/guava-libraries)
|
This product includes Guava (http://code.google.com/p/guava-libraries)
|
||||||
distributed under the Apache Software License, Version 2.0.
|
distributed under the Apache Software License, Version 2.0.
|
||||||
|
|
||||||
|
org.jclouds.util.Suppliers2 and Suppliers2Test contain pieces of Guava's Suppliers
|
||||||
|
distributed under the Apache Software License, Version 2.0.
|
||||||
|
(c) 2010 Google Inc.
|
||||||
|
|
||||||
This product includes BouncyCastle (http://www.bouncycastle.org/licence.html)
|
This product includes BouncyCastle (http://www.bouncycastle.org/licence.html)
|
||||||
distributed with the following notice:
|
distributed with the following notice:
|
||||||
"Copyright (c) 2000 - 2011 The Legion Of The Bouncy Castle (http://www.bouncycastle.org)
|
"Copyright (c) 2000 - 2011 The Legion Of The Bouncy Castle (http://www.bouncycastle.org)
|
||||||
|
|
Loading…
Reference in New Issue