mirror of https://github.com/apache/jclouds.git
Merge remote branch 'upstream/master'
This commit is contained in:
commit
3011ffad23
|
@ -41,6 +41,7 @@ import org.jclouds.blobstore.domain.Blob;
|
||||||
import org.jclouds.blobstore.domain.BlobBuilder;
|
import org.jclouds.blobstore.domain.BlobBuilder;
|
||||||
import org.jclouds.blobstore.domain.internal.BlobBuilderImpl;
|
import org.jclouds.blobstore.domain.internal.BlobBuilderImpl;
|
||||||
import org.jclouds.blobstore.options.ListContainerOptions;
|
import org.jclouds.blobstore.options.ListContainerOptions;
|
||||||
|
import org.jclouds.encryption.internal.JCECrypto;
|
||||||
import org.jclouds.filesystem.predicates.validators.internal.FilesystemBlobKeyValidatorImpl;
|
import org.jclouds.filesystem.predicates.validators.internal.FilesystemBlobKeyValidatorImpl;
|
||||||
import org.jclouds.filesystem.predicates.validators.internal.FilesystemContainerNameValidatorImpl;
|
import org.jclouds.filesystem.predicates.validators.internal.FilesystemContainerNameValidatorImpl;
|
||||||
import org.jclouds.filesystem.strategy.FilesystemStorageStrategy;
|
import org.jclouds.filesystem.strategy.FilesystemStorageStrategy;
|
||||||
|
@ -76,7 +77,11 @@ public class FilesystemStorageStrategyImplTest {
|
||||||
storageStrategy = new FilesystemStorageStrategyImpl(new Provider<BlobBuilder>() {
|
storageStrategy = new FilesystemStorageStrategyImpl(new Provider<BlobBuilder>() {
|
||||||
@Override
|
@Override
|
||||||
public BlobBuilder get() {
|
public BlobBuilder get() {
|
||||||
return new BlobBuilderImpl();
|
try {
|
||||||
|
return new BlobBuilderImpl(new JCECrypto());
|
||||||
|
} catch (Exception e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}, TestUtils.TARGET_BASE_DIR, new FilesystemContainerNameValidatorImpl(), new FilesystemBlobKeyValidatorImpl());
|
}, TestUtils.TARGET_BASE_DIR, new FilesystemContainerNameValidatorImpl(), new FilesystemBlobKeyValidatorImpl());
|
||||||
|
@ -139,9 +144,8 @@ public class FilesystemStorageStrategyImplTest {
|
||||||
|
|
||||||
public void testDeleteDirectory() throws IOException {
|
public void testDeleteDirectory() throws IOException {
|
||||||
TestUtils.createContainerAsDirectory(CONTAINER_NAME);
|
TestUtils.createContainerAsDirectory(CONTAINER_NAME);
|
||||||
TestUtils.createBlobsInContainer(
|
TestUtils.createBlobsInContainer(CONTAINER_NAME, new String[] {
|
||||||
CONTAINER_NAME,
|
TestUtils.createRandomBlobKey("lev1" + FS + "lev2" + FS + "lev3" + FS, ".txt"),
|
||||||
new String[] { TestUtils.createRandomBlobKey("lev1" + FS + "lev2" + FS + "lev3" + FS, ".txt"),
|
|
||||||
TestUtils.createRandomBlobKey("lev1" + FS + "lev2" + FS + "lev4" + FS, ".jpg") });
|
TestUtils.createRandomBlobKey("lev1" + FS + "lev2" + FS + "lev4" + FS, ".jpg") });
|
||||||
|
|
||||||
// delete directory in different ways
|
// delete directory in different ways
|
||||||
|
@ -158,9 +162,8 @@ public class FilesystemStorageStrategyImplTest {
|
||||||
TestUtils.directoryExists(TARGET_CONTAINER_NAME, true);
|
TestUtils.directoryExists(TARGET_CONTAINER_NAME, true);
|
||||||
|
|
||||||
// delete the directory and all the files inside
|
// delete the directory and all the files inside
|
||||||
TestUtils.createBlobsInContainer(
|
TestUtils.createBlobsInContainer(CONTAINER_NAME, new String[] {
|
||||||
CONTAINER_NAME,
|
TestUtils.createRandomBlobKey("lev1" + FS + "lev2" + FS + "lev3" + FS, ".txt"),
|
||||||
new String[] { TestUtils.createRandomBlobKey("lev1" + FS + "lev2" + FS + "lev3" + FS, ".txt"),
|
|
||||||
TestUtils.createRandomBlobKey("lev1" + FS + "lev2" + FS + "lev4" + FS, ".jpg") });
|
TestUtils.createRandomBlobKey("lev1" + FS + "lev2" + FS + "lev4" + FS, ".jpg") });
|
||||||
storageStrategy.deleteDirectory(CONTAINER_NAME, null);
|
storageStrategy.deleteDirectory(CONTAINER_NAME, null);
|
||||||
TestUtils.directoryExists(TARGET_CONTAINER_NAME, false);
|
TestUtils.directoryExists(TARGET_CONTAINER_NAME, false);
|
||||||
|
@ -208,9 +211,8 @@ public class FilesystemStorageStrategyImplTest {
|
||||||
|
|
||||||
public void testClearContainer() throws IOException {
|
public void testClearContainer() throws IOException {
|
||||||
storageStrategy.createContainer(CONTAINER_NAME);
|
storageStrategy.createContainer(CONTAINER_NAME);
|
||||||
Set<String> blobs = TestUtils.createBlobsInContainer(
|
Set<String> blobs = TestUtils.createBlobsInContainer(CONTAINER_NAME, new String[] {
|
||||||
CONTAINER_NAME,
|
TestUtils.createRandomBlobKey("clean_container-", ".jpg"),
|
||||||
new String[] { TestUtils.createRandomBlobKey("clean_container-", ".jpg"),
|
|
||||||
TestUtils.createRandomBlobKey("bf" + FS + "sd" + FS + "as" + FS + "clean_container-", ".jpg") });
|
TestUtils.createRandomBlobKey("bf" + FS + "sd" + FS + "as" + FS + "clean_container-", ".jpg") });
|
||||||
// test if file exits
|
// test if file exits
|
||||||
for (String blob : blobs) {
|
for (String blob : blobs) {
|
||||||
|
@ -238,9 +240,8 @@ public class FilesystemStorageStrategyImplTest {
|
||||||
|
|
||||||
public void testClearContainerAndThenDeleteContainer() throws IOException {
|
public void testClearContainerAndThenDeleteContainer() throws IOException {
|
||||||
storageStrategy.createContainer(CONTAINER_NAME);
|
storageStrategy.createContainer(CONTAINER_NAME);
|
||||||
Set<String> blobs = TestUtils.createBlobsInContainer(
|
Set<String> blobs = TestUtils.createBlobsInContainer(CONTAINER_NAME, new String[] {
|
||||||
CONTAINER_NAME,
|
TestUtils.createRandomBlobKey("clean_container-", ".jpg"),
|
||||||
new String[] { TestUtils.createRandomBlobKey("clean_container-", ".jpg"),
|
|
||||||
TestUtils.createRandomBlobKey("bf" + FS + "sd" + FS + "as" + FS + "clean_container-", ".jpg") });
|
TestUtils.createRandomBlobKey("bf" + FS + "sd" + FS + "as" + FS + "clean_container-", ".jpg") });
|
||||||
// test if file exits
|
// test if file exits
|
||||||
for (String blob : blobs) {
|
for (String blob : blobs) {
|
||||||
|
@ -402,7 +403,11 @@ public class FilesystemStorageStrategyImplTest {
|
||||||
new Provider<BlobBuilder>() {
|
new Provider<BlobBuilder>() {
|
||||||
@Override
|
@Override
|
||||||
public BlobBuilder get() {
|
public BlobBuilder get() {
|
||||||
return new BlobBuilderImpl();
|
try {
|
||||||
|
return new BlobBuilderImpl(new JCECrypto());
|
||||||
|
} catch (Exception e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}, absoluteBasePath, new FilesystemContainerNameValidatorImpl(), new FilesystemBlobKeyValidatorImpl());
|
}, absoluteBasePath, new FilesystemContainerNameValidatorImpl(), new FilesystemBlobKeyValidatorImpl());
|
||||||
TestUtils.cleanDirectoryContent(absoluteContainerPath);
|
TestUtils.cleanDirectoryContent(absoluteContainerPath);
|
||||||
|
@ -436,9 +441,8 @@ public class FilesystemStorageStrategyImplTest {
|
||||||
|
|
||||||
public void testRemoveBlob() throws IOException {
|
public void testRemoveBlob() throws IOException {
|
||||||
storageStrategy.createContainer(CONTAINER_NAME);
|
storageStrategy.createContainer(CONTAINER_NAME);
|
||||||
Set<String> blobKeys = TestUtils.createBlobsInContainer(
|
Set<String> blobKeys = TestUtils.createBlobsInContainer(CONTAINER_NAME, new String[] {
|
||||||
CONTAINER_NAME,
|
TestUtils.createRandomBlobKey("removeBlob-", ".jpg"),
|
||||||
new String[] { TestUtils.createRandomBlobKey("removeBlob-", ".jpg"),
|
|
||||||
TestUtils.createRandomBlobKey("removeBlob-", ".jpg"),
|
TestUtils.createRandomBlobKey("removeBlob-", ".jpg"),
|
||||||
TestUtils.createRandomBlobKey("346" + FS + "g3sx2" + FS + "removeBlob-", ".jpg"),
|
TestUtils.createRandomBlobKey("346" + FS + "g3sx2" + FS + "removeBlob-", ".jpg"),
|
||||||
TestUtils.createRandomBlobKey("346" + FS + "g3sx2" + FS + "removeBlob-", ".jpg") });
|
TestUtils.createRandomBlobKey("346" + FS + "g3sx2" + FS + "removeBlob-", ".jpg") });
|
||||||
|
@ -478,9 +482,8 @@ public class FilesystemStorageStrategyImplTest {
|
||||||
|
|
||||||
// create blobs
|
// create blobs
|
||||||
storageStrategy.createContainer(CONTAINER_NAME);
|
storageStrategy.createContainer(CONTAINER_NAME);
|
||||||
Set<String> createBlobKeys = TestUtils.createBlobsInContainer(
|
Set<String> createBlobKeys = TestUtils.createBlobsInContainer(CONTAINER_NAME, new String[] {
|
||||||
CONTAINER_NAME,
|
TestUtils.createRandomBlobKey("GetBlobKeys-", ".jpg"),
|
||||||
new String[] { TestUtils.createRandomBlobKey("GetBlobKeys-", ".jpg"),
|
|
||||||
TestUtils.createRandomBlobKey("GetBlobKeys-", ".jpg"),
|
TestUtils.createRandomBlobKey("GetBlobKeys-", ".jpg"),
|
||||||
TestUtils.createRandomBlobKey("563" + FS + "g3sx2" + FS + "removeBlob-", ".jpg"),
|
TestUtils.createRandomBlobKey("563" + FS + "g3sx2" + FS + "removeBlob-", ".jpg"),
|
||||||
TestUtils.createRandomBlobKey("563" + FS + "g3sx2" + FS + "removeBlob-", ".jpg") });
|
TestUtils.createRandomBlobKey("563" + FS + "g3sx2" + FS + "removeBlob-", ".jpg") });
|
||||||
|
|
|
@ -26,10 +26,8 @@ import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.security.MessageDigest;
|
import java.security.MessageDigest;
|
||||||
import java.security.NoSuchAlgorithmException;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
|
|
||||||
import org.jclouds.blobstore.domain.Blob;
|
import org.jclouds.blobstore.domain.Blob;
|
||||||
|
@ -40,20 +38,23 @@ import org.jclouds.io.Payload;
|
||||||
import org.jclouds.io.Payloads;
|
import org.jclouds.io.Payloads;
|
||||||
import org.jclouds.io.payloads.PhantomPayload;
|
import org.jclouds.io.payloads.PhantomPayload;
|
||||||
|
|
||||||
import com.google.common.base.Throwables;
|
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Adrian Cole
|
* @author Adrian Cole
|
||||||
*/
|
*/
|
||||||
public class BlobBuilderImpl implements BlobBuilder {
|
public class BlobBuilderImpl implements BlobBuilder {
|
||||||
|
private final Crypto crypto;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public BlobBuilderImpl(Crypto crypto) {
|
||||||
|
this.crypto = checkNotNull(crypto, "crypto");
|
||||||
|
}
|
||||||
|
|
||||||
private Payload payload;
|
private Payload payload;
|
||||||
private String name;
|
private String name;
|
||||||
private Map<String, String> userMetadata = Maps.newLinkedHashMap();
|
private Map<String, String> userMetadata = Maps.newLinkedHashMap();
|
||||||
private StorageType type = StorageType.BLOB;
|
private StorageType type = StorageType.BLOB;
|
||||||
@Inject
|
|
||||||
private Crypto crypto;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BlobBuilder name(String name) {
|
public BlobBuilder name(String name) {
|
||||||
|
@ -129,15 +130,10 @@ public class BlobBuilderImpl implements BlobBuilder {
|
||||||
private final Payload payload;
|
private final Payload payload;
|
||||||
private MessageDigest digest;
|
private MessageDigest digest;
|
||||||
|
|
||||||
public PayloadBlobBuilderImpl(BlobBuilder builder, Payload payload, @Nullable Crypto crypto) {
|
public PayloadBlobBuilderImpl(BlobBuilder builder, Payload payload, Crypto crypto) {
|
||||||
this.builder = checkNotNull(builder, "builder");
|
this.builder = checkNotNull(builder, "builder");
|
||||||
this.payload = checkNotNull(payload, "payload");
|
this.payload = checkNotNull(payload, "payload");
|
||||||
try {
|
this.digest = checkNotNull(crypto, "crypto").md5();
|
||||||
this.digest = crypto != null ? crypto.md5() : MessageDigest.getInstance("MD5");
|
|
||||||
} catch (NoSuchAlgorithmException e) {
|
|
||||||
Throwables.propagate(e);
|
|
||||||
this.digest = null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -58,6 +58,7 @@ import com.google.inject.Inject;
|
||||||
import com.google.inject.Injector;
|
import com.google.inject.Injector;
|
||||||
import com.google.inject.Key;
|
import com.google.inject.Key;
|
||||||
import com.google.inject.Provides;
|
import com.google.inject.Provides;
|
||||||
|
import com.google.inject.ProvisionException;
|
||||||
import com.google.inject.TypeLiteral;
|
import com.google.inject.TypeLiteral;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
|
@ -106,12 +107,19 @@ public class AsyncRestClientProxy<T> implements InvocationHandler {
|
||||||
} else if (method.getName().equals("hashCode")) {
|
} else if (method.getName().equals("hashCode")) {
|
||||||
return this.hashCode();
|
return this.hashCode();
|
||||||
} else if (method.isAnnotationPresent(Provides.class)) {
|
} else if (method.isAnnotationPresent(Provides.class)) {
|
||||||
|
try {
|
||||||
try {
|
try {
|
||||||
Annotation qualifier = Iterables.find(ImmutableList.copyOf(method.getAnnotations()), isQualifierPresent);
|
Annotation qualifier = Iterables.find(ImmutableList.copyOf(method.getAnnotations()), isQualifierPresent);
|
||||||
return injector.getInstance(Key.get(method.getGenericReturnType(), qualifier));
|
return injector.getInstance(Key.get(method.getGenericReturnType(), qualifier));
|
||||||
} catch (NoSuchElementException e) {
|
} catch (NoSuchElementException e) {
|
||||||
return injector.getInstance(Key.get(method.getGenericReturnType()));
|
return injector.getInstance(Key.get(method.getGenericReturnType()));
|
||||||
}
|
}
|
||||||
|
} catch (ProvisionException e) {
|
||||||
|
AuthorizationException aex = Throwables2.getFirstThrowableOfType(e, AuthorizationException.class);
|
||||||
|
if (aex != null)
|
||||||
|
throw aex;
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
} else if (method.isAnnotationPresent(Delegate.class)) {
|
} else if (method.isAnnotationPresent(Delegate.class)) {
|
||||||
return delegateMap.get(new ClassMethodArgs(method.getReturnType(), method, args));
|
return delegateMap.get(new ClassMethodArgs(method.getReturnType(), method, args));
|
||||||
} else if (annotationProcessor.getDelegateOrNull(method) != null
|
} else if (annotationProcessor.getDelegateOrNull(method) != null
|
||||||
|
|
|
@ -0,0 +1,258 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
|
||||||
|
*
|
||||||
|
* ====================================================================
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
* ====================================================================
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.google.common.util.concurrent;
|
||||||
|
|
||||||
|
import static com.google.common.base.Throwables.propagate;
|
||||||
|
import static com.google.common.collect.Maps.newHashMap;
|
||||||
|
import static java.util.concurrent.Executors.newCachedThreadPool;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* In google appengine, we can get a future without using an executorservice, using its async http
|
||||||
|
* fetch command. However, we still may need to do some conversions, or add listeners. In
|
||||||
|
* googleappengine, we cannot employ a *real* executorservice, but we can employ a same thread
|
||||||
|
* executor. This test identifies efficiencies that can be made by strengthening guava's handling of
|
||||||
|
* same thread execution.
|
||||||
|
*
|
||||||
|
* <p/>
|
||||||
|
*
|
||||||
|
* We simulate an i/o future by running a callable that simply sleeps. How this is created isn't
|
||||||
|
* important.
|
||||||
|
*
|
||||||
|
* <ol>
|
||||||
|
* <li>{@code IO_DURATION} is the time that the source future spends doing work</li>
|
||||||
|
* <li>{@code LISTENER_DURATION} is the time that the attached listener or function</li>
|
||||||
|
* </ol>
|
||||||
|
*
|
||||||
|
* The execution time of a composed task within a composite should not be more than {@code
|
||||||
|
* IO_DURATION} + {@code LISTENER_DURATION} + overhead when a threadpool is used. This is because
|
||||||
|
* the listener should be invoked as soon as the result is available.
|
||||||
|
* <p/>
|
||||||
|
* The execution time of a composed task within a composite should not be more than {@code
|
||||||
|
* IO_DURATION} + {@code LISTENER_DURATION} * {@code COUNT} + overhead when caller thread is used
|
||||||
|
* for handling the listeners.
|
||||||
|
* <p/>
|
||||||
|
* This test shows that Futures.compose eagerly issues a get() on the source future. code iterating
|
||||||
|
* over futures and assigning listeners will take the same amount of time as calling get() on each
|
||||||
|
* one, if using a within thread executor. This exposes an inefficiency which can make some use
|
||||||
|
* cases in google appengine impossible to achieve within the cutoff limits.
|
||||||
|
*
|
||||||
|
* @author Adrian Cole
|
||||||
|
*/
|
||||||
|
@Test(groups = "performance", enabled = false, sequential = true, testName = "FuturesComposePerformanceTest")
|
||||||
|
public class FuturesComposePerformanceTest {
|
||||||
|
private static final int FUDGE = 5;
|
||||||
|
private static final int COUNT = 100;
|
||||||
|
private static final int IO_DURATION = 50;
|
||||||
|
private static final int LISTENER_DURATION = 100;
|
||||||
|
|
||||||
|
ExecutorService ioFunctionExecutor = newCachedThreadPool();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When we use threadpools for both the chain and invoking listener, user experience is
|
||||||
|
* consistent.
|
||||||
|
*/
|
||||||
|
@Test(enabled = false)
|
||||||
|
public void whenCachedThreadPoolIsUsedForChainAndListenerMaxDurationIsSumOfCallableAndListener()
|
||||||
|
throws InterruptedException, ExecutionException {
|
||||||
|
long expectedMax = IO_DURATION + LISTENER_DURATION;
|
||||||
|
long expectedMin = IO_DURATION + LISTENER_DURATION;
|
||||||
|
long expectedOverhead = COUNT * 4 + FUDGE;
|
||||||
|
|
||||||
|
ExecutorService userthreads = newCachedThreadPool();
|
||||||
|
try {
|
||||||
|
ExecutorService chainExecutor = userthreads;
|
||||||
|
ExecutorService listenerExecutor = userthreads;
|
||||||
|
|
||||||
|
checkThresholdsUsingFuturesCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor);
|
||||||
|
} finally {
|
||||||
|
userthreads.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When we use threadpools for the chain, but same thread for invoking listener, user experience
|
||||||
|
* is still consistent.
|
||||||
|
*/
|
||||||
|
@Test(enabled = false)
|
||||||
|
public void whenCachedThreadPoolIsUsedForChainButSameThreadForListenerMaxDurationIsSumOfCallableAndListener()
|
||||||
|
throws InterruptedException, ExecutionException {
|
||||||
|
long expectedMax = IO_DURATION + LISTENER_DURATION;
|
||||||
|
long expectedMin = IO_DURATION + LISTENER_DURATION;
|
||||||
|
long expectedOverhead = COUNT + FUDGE;
|
||||||
|
|
||||||
|
ExecutorService userthreads = newCachedThreadPool();
|
||||||
|
try {
|
||||||
|
ExecutorService chainExecutor = userthreads;
|
||||||
|
ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor();
|
||||||
|
|
||||||
|
checkThresholdsUsingFuturesCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor);
|
||||||
|
} finally {
|
||||||
|
userthreads.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When using same thread for the chain, the futures are being called (get()) eagerly, resulting
|
||||||
|
* in the max duration being the sum of all i/o plus the cost of executing the listeners. In this
|
||||||
|
* case, listeners are executed in a different thread pool.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Test(enabled = false)
|
||||||
|
public void whenSameThreadIsUsedForChainButCachedThreadPoolForListenerMaxDurationIsSumOfAllIOAndOneListener()
|
||||||
|
throws InterruptedException, ExecutionException {
|
||||||
|
long expectedMax = (IO_DURATION * COUNT) + LISTENER_DURATION;
|
||||||
|
long expectedMin = IO_DURATION + LISTENER_DURATION;
|
||||||
|
long expectedOverhead = COUNT + FUDGE;
|
||||||
|
|
||||||
|
ExecutorService userthreads = newCachedThreadPool();
|
||||||
|
try {
|
||||||
|
ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor();
|
||||||
|
ExecutorService listenerExecutor = userthreads;
|
||||||
|
|
||||||
|
checkThresholdsUsingFuturesCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor);
|
||||||
|
} finally {
|
||||||
|
userthreads.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This case can be optimized for sure. The side effect of the eager get() is that all i/o must
|
||||||
|
* complete before *any* listeners are run. In this case, if you are inside google appengine and
|
||||||
|
* using same thread executors, worst experience is sum of all io duration plus the sum of all
|
||||||
|
* listener duration. An efficient implementation would call get() on the i/o future lazily. Such
|
||||||
|
* an impl would have a max duration of I/O + Listener * count.
|
||||||
|
*/
|
||||||
|
@Test(enabled = false)
|
||||||
|
public void whenSameThreadIsUsedForChainAndListenerMaxDurationIsSumOfAllIOAndAllListeners()
|
||||||
|
throws InterruptedException, ExecutionException {
|
||||||
|
|
||||||
|
long expectedMax = (IO_DURATION * COUNT) + (LISTENER_DURATION * COUNT);
|
||||||
|
long expectedMin = IO_DURATION + LISTENER_DURATION;
|
||||||
|
long expectedOverhead = COUNT + FUDGE;
|
||||||
|
|
||||||
|
ExecutorService userthreads = newCachedThreadPool();
|
||||||
|
try {
|
||||||
|
ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor();
|
||||||
|
ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor();
|
||||||
|
|
||||||
|
checkThresholdsUsingFuturesCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor);
|
||||||
|
} finally {
|
||||||
|
userthreads.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkThresholdsUsingFuturesCompose(long expectedMin, long expectedMax, long expectedOverhead,
|
||||||
|
ExecutorService chainExecutor, final ExecutorService listenerExecutor) {
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
Map<String, Future<Long>> responses = newHashMap();
|
||||||
|
for (int i = 0; i < COUNT; i++)
|
||||||
|
responses.put(i + "", Futures.compose(Futures.makeListenable(simultateIO(), chainExecutor),
|
||||||
|
new Function<Long, Long>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long apply(Long from) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(LISTENER_DURATION);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
propagate(e);
|
||||||
|
}
|
||||||
|
return System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
}, listenerExecutor));
|
||||||
|
checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Future<Long> simultateIO() {
|
||||||
|
return ioFunctionExecutor.submit(new Callable<Long>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long call() throws Exception {
|
||||||
|
Thread.sleep(IO_DURATION);
|
||||||
|
return System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long getMaxIn(Map<String, Future<Long>> responses) {
|
||||||
|
Iterable<Long> collection = Iterables.transform(responses.values(), new Function<Future<Long>, Long>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long apply(Future<Long> from) {
|
||||||
|
try {
|
||||||
|
return from.get();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
long time = Collections.max(Sets.newHashSet(collection));
|
||||||
|
return time;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long getMinIn(Map<String, Future<Long>> responses) {
|
||||||
|
Iterable<Long> collection = Iterables.transform(responses.values(), new Function<Future<Long>, Long>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long apply(Future<Long> from) {
|
||||||
|
try {
|
||||||
|
return from.get();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
long time = Collections.min(Sets.newHashSet(collection));
|
||||||
|
return time;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void checkTimeThresholds(long expectedMin, long expectedMax, long expectedOverhead, long start,
|
||||||
|
Map<String, Future<Long>> responses) {
|
||||||
|
long time = getMaxIn(responses) - start;
|
||||||
|
assert time >= expectedMax && time < expectedMax + expectedOverhead : String.format("expectedMax %d, max %d",
|
||||||
|
expectedMax, time);
|
||||||
|
|
||||||
|
time = getMinIn(responses) - start;
|
||||||
|
assert time >= expectedMin && time < expectedMin + expectedOverhead : String.format("expectedMin %d, min %d",
|
||||||
|
expectedMin, time);
|
||||||
|
|
||||||
|
time = getMaxIn(responses) - start;
|
||||||
|
assert time >= expectedMax && time < expectedMax + expectedOverhead : String.format("expectedMax %d, max %d",
|
||||||
|
expectedMax, time);
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,16 +19,15 @@
|
||||||
|
|
||||||
package org.jclouds.concurrent;
|
package org.jclouds.concurrent;
|
||||||
|
|
||||||
|
import static com.google.common.base.Throwables.propagate;
|
||||||
|
import static com.google.common.collect.Maps.newHashMap;
|
||||||
import static java.util.concurrent.Executors.newCachedThreadPool;
|
import static java.util.concurrent.Executors.newCachedThreadPool;
|
||||||
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
|
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
|
||||||
import static org.jclouds.concurrent.FuturesTestingUtils.CALLABLE_DURATION;
|
|
||||||
import static org.jclouds.concurrent.FuturesTestingUtils.COUNT;
|
|
||||||
import static org.jclouds.concurrent.FuturesTestingUtils.FUDGE;
|
|
||||||
import static org.jclouds.concurrent.FuturesTestingUtils.checkTimeThresholds;
|
|
||||||
import static org.jclouds.concurrent.FuturesTestingUtils.runCallables;
|
|
||||||
import static org.testng.Assert.assertEquals;
|
import static org.testng.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
@ -36,43 +35,216 @@ import java.util.concurrent.Future;
|
||||||
import org.jclouds.logging.Logger;
|
import org.jclouds.logging.Logger;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests behavior of FutureIterables
|
* Tests behavior of FutureIterables
|
||||||
*
|
*
|
||||||
* @author Adrian Cole
|
* @author Adrian Cole
|
||||||
*/
|
*/
|
||||||
@Test(enabled = false, groups = "performance", sequential = true)
|
@Test(groups = "performance", enabled = false, sequential = true, testName = "FutureIterablesTest")
|
||||||
public class FutureIterablesTest {
|
public class FutureIterablesTest {
|
||||||
|
ExecutorService ioFunctionExecutor = newCachedThreadPool();
|
||||||
|
|
||||||
@Test(enabled = false)
|
@Test(enabled = false)
|
||||||
public void testMakeListenableDoesntSerializeFutures() throws InterruptedException, ExecutionException {
|
public void testMakeListenableDoesntSerializeFutures() throws InterruptedException, ExecutionException {
|
||||||
long expectedMax = CALLABLE_DURATION;
|
long expectedMax = IO_DURATION;
|
||||||
long expectedMin = CALLABLE_DURATION;
|
long expectedMin = IO_DURATION;
|
||||||
long expectedOverhead = COUNT + FUDGE;
|
long expectedOverhead = COUNT + FUDGE;
|
||||||
|
|
||||||
ExecutorService callableExecutor = newCachedThreadPool();
|
|
||||||
ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor();
|
ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor();
|
||||||
|
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
Map<String, Future<Long>> responses = runCallables(callableExecutor, chainExecutor);
|
Map<String, Future<Long>> responses = runCallables(chainExecutor);
|
||||||
checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses);
|
checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(enabled = false)
|
@Test(enabled = false)
|
||||||
public void testAwaitCompletionUsingSameThreadExecutorDoesntSerializeFutures() throws InterruptedException,
|
public void testAwaitCompletionUsingSameThreadExecutorDoesntSerializeFutures() throws InterruptedException,
|
||||||
ExecutionException {
|
ExecutionException {
|
||||||
long expectedMax = CALLABLE_DURATION;
|
long expectedMax = IO_DURATION;
|
||||||
long expectedMin = CALLABLE_DURATION;
|
long expectedMin = IO_DURATION;
|
||||||
long expectedOverhead = COUNT + FUDGE;
|
long expectedOverhead = COUNT + FUDGE;
|
||||||
|
|
||||||
ExecutorService callableExecutor = newCachedThreadPool();
|
|
||||||
ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor();
|
ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor();
|
||||||
|
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
Map<String, Future<Long>> responses = runCallables(callableExecutor, chainExecutor);
|
Map<String, Future<Long>> responses = runCallables(chainExecutor);
|
||||||
Map<String, Exception> exceptions = awaitCompletion(responses, MoreExecutors.sameThreadExecutor(), null,
|
Map<String, Exception> exceptions = awaitCompletion(responses, MoreExecutors.sameThreadExecutor(), null,
|
||||||
Logger.CONSOLE, "test same thread");
|
Logger.CONSOLE, "test same thread");
|
||||||
assertEquals(exceptions.size(), 0);
|
assertEquals(exceptions.size(), 0);
|
||||||
checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses);
|
checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(enabled = false)
|
||||||
|
public void whenCachedThreadPoolIsUsedForChainAndListenerMaxDurationIsSumOfCallableAndListener()
|
||||||
|
throws InterruptedException, ExecutionException {
|
||||||
|
long expectedMax = IO_DURATION + LISTENER_DURATION;
|
||||||
|
long expectedMin = IO_DURATION + LISTENER_DURATION;
|
||||||
|
long expectedOverhead = COUNT * 4 + FUDGE;
|
||||||
|
|
||||||
|
ExecutorService userthreads = newCachedThreadPool();
|
||||||
|
try {
|
||||||
|
ExecutorService chainExecutor = userthreads;
|
||||||
|
ExecutorService listenerExecutor = userthreads;
|
||||||
|
|
||||||
|
checkThresholdsUsingCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor);
|
||||||
|
} finally {
|
||||||
|
userthreads.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(enabled = false)
|
||||||
|
public void whenCachedThreadPoolIsUsedForChainButSameThreadForListenerMaxDurationIsSumOfCallableAndListener()
|
||||||
|
throws InterruptedException, ExecutionException {
|
||||||
|
long expectedMax = IO_DURATION + LISTENER_DURATION;
|
||||||
|
long expectedMin = IO_DURATION + LISTENER_DURATION;
|
||||||
|
long expectedOverhead = COUNT + FUDGE;
|
||||||
|
|
||||||
|
ExecutorService userthreads = newCachedThreadPool();
|
||||||
|
try {
|
||||||
|
ExecutorService chainExecutor = userthreads;
|
||||||
|
ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor();
|
||||||
|
|
||||||
|
checkThresholdsUsingCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor);
|
||||||
|
} finally {
|
||||||
|
userthreads.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(enabled = false)
|
||||||
|
public void whenSameThreadIsUsedForChainButCachedThreadPoolForListenerMaxDurationIsIOAndSumOfAllListeners()
|
||||||
|
throws InterruptedException, ExecutionException {
|
||||||
|
long expectedMax = IO_DURATION + (LISTENER_DURATION * COUNT);
|
||||||
|
long expectedMin = IO_DURATION + LISTENER_DURATION;
|
||||||
|
long expectedOverhead = COUNT + FUDGE;
|
||||||
|
|
||||||
|
ExecutorService userthreads = newCachedThreadPool();
|
||||||
|
try {
|
||||||
|
ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor();
|
||||||
|
ExecutorService listenerExecutor = userthreads;
|
||||||
|
|
||||||
|
checkThresholdsUsingCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor);
|
||||||
|
} finally {
|
||||||
|
userthreads.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(enabled = false)
|
||||||
|
public void whenSameThreadIsUsedForChainAndListenerMaxDurationIsIOAndSumOfAllListeners()
|
||||||
|
throws InterruptedException, ExecutionException {
|
||||||
|
|
||||||
|
long expectedMax = IO_DURATION + (LISTENER_DURATION * COUNT);
|
||||||
|
long expectedMin = IO_DURATION + LISTENER_DURATION;
|
||||||
|
long expectedOverhead = COUNT + FUDGE;
|
||||||
|
|
||||||
|
ExecutorService userthreads = newCachedThreadPool();
|
||||||
|
try {
|
||||||
|
ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor();
|
||||||
|
ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor();
|
||||||
|
|
||||||
|
checkThresholdsUsingCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor);
|
||||||
|
} finally {
|
||||||
|
userthreads.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final int FUDGE = 5;
|
||||||
|
public static final int COUNT = 100;
|
||||||
|
public static final int IO_DURATION = 50;
|
||||||
|
public static final int LISTENER_DURATION = 100;
|
||||||
|
|
||||||
|
private void checkThresholdsUsingCompose(long expectedMin, long expectedMax, long expectedOverhead,
|
||||||
|
ExecutorService chainExecutor, final ExecutorService listenerExecutor) {
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
Map<String, Future<Long>> responses = newHashMap();
|
||||||
|
for (int i = 0; i < COUNT; i++)
|
||||||
|
responses.put(i + "", org.jclouds.concurrent.Futures.compose(org.jclouds.concurrent.Futures.makeListenable(
|
||||||
|
simultateIO(), chainExecutor), new Function<Long, Long>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long apply(Long from) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(LISTENER_DURATION);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
propagate(e);
|
||||||
|
}
|
||||||
|
return System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
}, listenerExecutor));
|
||||||
|
checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Future<Long>> runCallables(ExecutorService chainExecutor) {
|
||||||
|
Map<String, Future<Long>> responses = newHashMap();
|
||||||
|
for (int i = 0; i < COUNT; i++)
|
||||||
|
responses.put(i + "", org.jclouds.concurrent.Futures.makeListenable(simultateIO(), chainExecutor));
|
||||||
|
return responses;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Future<Long> simultateIO() {
|
||||||
|
return ioFunctionExecutor.submit(new Callable<Long>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long call() throws Exception {
|
||||||
|
Thread.sleep(IO_DURATION);
|
||||||
|
return System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long getMaxIn(Map<String, Future<Long>> responses) {
|
||||||
|
Iterable<Long> collection = Iterables.transform(responses.values(), new Function<Future<Long>, Long>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long apply(Future<Long> from) {
|
||||||
|
try {
|
||||||
|
return from.get();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
long time = Collections.max(Sets.newHashSet(collection));
|
||||||
|
return time;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long getMinIn(Map<String, Future<Long>> responses) {
|
||||||
|
Iterable<Long> collection = Iterables.transform(responses.values(), new Function<Future<Long>, Long>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long apply(Future<Long> from) {
|
||||||
|
try {
|
||||||
|
return from.get();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
long time = Collections.min(Sets.newHashSet(collection));
|
||||||
|
return time;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void checkTimeThresholds(long expectedMin, long expectedMax, long expectedOverhead, long start,
|
||||||
|
Map<String, Future<Long>> responses) {
|
||||||
|
long time = getMaxIn(responses) - start;
|
||||||
|
assert time >= expectedMax && time < expectedMax + expectedOverhead : String.format("expectedMax %d, max %d",
|
||||||
|
expectedMax, time);
|
||||||
|
|
||||||
|
time = getMinIn(responses) - start;
|
||||||
|
assert time >= expectedMin && time < expectedMin + expectedOverhead : String.format("expectedMin %d, min %d",
|
||||||
|
expectedMin, time);
|
||||||
|
|
||||||
|
time = getMaxIn(responses) - start;
|
||||||
|
assert time >= expectedMax && time < expectedMax + expectedOverhead : String.format("expectedMax %d, max %d",
|
||||||
|
expectedMax, time);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,215 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
|
|
||||||
*
|
|
||||||
* ====================================================================
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
* ====================================================================
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.jclouds.concurrent;
|
|
||||||
|
|
||||||
import static java.util.concurrent.Executors.newCachedThreadPool;
|
|
||||||
import static org.jclouds.concurrent.FuturesTestingUtils.CALLABLE_DURATION;
|
|
||||||
import static org.jclouds.concurrent.FuturesTestingUtils.COUNT;
|
|
||||||
import static org.jclouds.concurrent.FuturesTestingUtils.FUDGE;
|
|
||||||
import static org.jclouds.concurrent.FuturesTestingUtils.LISTENER_DURATION;
|
|
||||||
import static org.jclouds.concurrent.FuturesTestingUtils.checkThresholdsUsingConcurrentUtilsCompose;
|
|
||||||
import static org.jclouds.concurrent.FuturesTestingUtils.checkThresholdsUsingFuturesCompose;
|
|
||||||
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
|
|
||||||
import org.testng.annotations.Test;
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
*
|
|
||||||
* <p/>
|
|
||||||
* All of these tests simulate a future by invoking callables in a separate executor. The point of
|
|
||||||
* this test is to see what happens when we chain futures together.
|
|
||||||
*
|
|
||||||
* <ol>
|
|
||||||
* <li>{@code CALLABLE_DURATION} is the time that the source future spends doing work</li>
|
|
||||||
* <li>{@code LISTENER_DURATION} is the time that the attached listener or function</li>
|
|
||||||
* </ol>
|
|
||||||
*
|
|
||||||
* The execution time of a composed task within a composite should not be more than {@code
|
|
||||||
* CALLABLE_DURATION} + {@code LISTENER_DURATION} + overhead when a threadpool is used. This is
|
|
||||||
* because the listener should be invoked as soon as the result is available.
|
|
||||||
* <p/>
|
|
||||||
* The execution time of a composed task within a composite should not be more than {@code
|
|
||||||
* CALLABLE_DURATION} + {@code LISTENER_DURATION} * {@code COUNT} + overhead when caller thread is
|
|
||||||
* used for handling the listeners.
|
|
||||||
* <p/>
|
|
||||||
* ConcurrentUtils overcomes a shortcoming found in Google Guava r06, where Futures.compose eagerly
|
|
||||||
* issues a get() on the source future. This has the effect of serializing the futures as you
|
|
||||||
* iterate. It overcomes this by tagging the ExecutorService we associate with sameThread execution
|
|
||||||
* and lazy convert values accordingly.
|
|
||||||
*
|
|
||||||
* @author Adrian Cole
|
|
||||||
*/
|
|
||||||
@Test(enabled = false, groups = "performance", sequential = true)
|
|
||||||
public class FuturesComposePerformanceTest {
|
|
||||||
ExecutorService callableExecutor = newCachedThreadPool();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* When Futures.compose is
|
|
||||||
*/
|
|
||||||
@Test(enabled = false)
|
|
||||||
public void testFuturesCompose1() throws InterruptedException, ExecutionException {
|
|
||||||
long expectedMax = CALLABLE_DURATION + LISTENER_DURATION;
|
|
||||||
long expectedMin = CALLABLE_DURATION + LISTENER_DURATION;
|
|
||||||
long expectedOverhead = COUNT * 4 + FUDGE;
|
|
||||||
|
|
||||||
ExecutorService userthreads = newCachedThreadPool();
|
|
||||||
try {
|
|
||||||
ExecutorService chainExecutor = userthreads;
|
|
||||||
ExecutorService listenerExecutor = userthreads;
|
|
||||||
|
|
||||||
checkThresholdsUsingFuturesCompose(expectedMin, expectedMax, expectedOverhead, callableExecutor,
|
|
||||||
chainExecutor, listenerExecutor);
|
|
||||||
} finally {
|
|
||||||
userthreads.shutdownNow();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(enabled = false)
|
|
||||||
public void testFuturesCompose2() throws InterruptedException, ExecutionException {
|
|
||||||
long expectedMax = CALLABLE_DURATION + LISTENER_DURATION;
|
|
||||||
long expectedMin = CALLABLE_DURATION + LISTENER_DURATION;
|
|
||||||
long expectedOverhead = COUNT + FUDGE;
|
|
||||||
|
|
||||||
ExecutorService userthreads = newCachedThreadPool();
|
|
||||||
try {
|
|
||||||
ExecutorService chainExecutor = userthreads;
|
|
||||||
ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor();
|
|
||||||
|
|
||||||
checkThresholdsUsingFuturesCompose(expectedMin, expectedMax, expectedOverhead, callableExecutor,
|
|
||||||
chainExecutor, listenerExecutor);
|
|
||||||
} finally {
|
|
||||||
userthreads.shutdownNow();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(enabled = false)
|
|
||||||
public void testFuturesCompose3() throws InterruptedException, ExecutionException {
|
|
||||||
long expectedMax = (CALLABLE_DURATION * COUNT) + LISTENER_DURATION;
|
|
||||||
long expectedMin = CALLABLE_DURATION + LISTENER_DURATION;
|
|
||||||
long expectedOverhead = COUNT + FUDGE;
|
|
||||||
|
|
||||||
ExecutorService userthreads = newCachedThreadPool();
|
|
||||||
try {
|
|
||||||
ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor();
|
|
||||||
ExecutorService listenerExecutor = userthreads;
|
|
||||||
|
|
||||||
checkThresholdsUsingFuturesCompose(expectedMin, expectedMax, expectedOverhead, callableExecutor,
|
|
||||||
chainExecutor, listenerExecutor);
|
|
||||||
} finally {
|
|
||||||
userthreads.shutdownNow();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(enabled = false)
|
|
||||||
public void testFuturesCompose4() throws InterruptedException, ExecutionException {
|
|
||||||
|
|
||||||
long expectedMax = (CALLABLE_DURATION * COUNT) + (LISTENER_DURATION * COUNT);
|
|
||||||
long expectedMin = CALLABLE_DURATION + LISTENER_DURATION;
|
|
||||||
long expectedOverhead = COUNT + FUDGE;
|
|
||||||
|
|
||||||
ExecutorService userthreads = newCachedThreadPool();
|
|
||||||
try {
|
|
||||||
ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor();
|
|
||||||
ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor();
|
|
||||||
|
|
||||||
checkThresholdsUsingFuturesCompose(expectedMin, expectedMax, expectedOverhead, callableExecutor,
|
|
||||||
chainExecutor, listenerExecutor);
|
|
||||||
} finally {
|
|
||||||
userthreads.shutdownNow();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(enabled = false)
|
|
||||||
public void testConcurrentUtilsCompose1() throws InterruptedException, ExecutionException {
|
|
||||||
long expectedMax = CALLABLE_DURATION + LISTENER_DURATION;
|
|
||||||
long expectedMin = CALLABLE_DURATION + LISTENER_DURATION;
|
|
||||||
long expectedOverhead = COUNT * 4 + FUDGE;
|
|
||||||
|
|
||||||
ExecutorService userthreads = newCachedThreadPool();
|
|
||||||
try {
|
|
||||||
ExecutorService chainExecutor = userthreads;
|
|
||||||
ExecutorService listenerExecutor = userthreads;
|
|
||||||
|
|
||||||
checkThresholdsUsingConcurrentUtilsCompose(expectedMin, expectedMax, expectedOverhead, callableExecutor,
|
|
||||||
chainExecutor, listenerExecutor);
|
|
||||||
} finally {
|
|
||||||
userthreads.shutdownNow();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(enabled = false)
|
|
||||||
public void testConcurrentUtilsCompose2() throws InterruptedException, ExecutionException {
|
|
||||||
long expectedMax = CALLABLE_DURATION + LISTENER_DURATION;
|
|
||||||
long expectedMin = CALLABLE_DURATION + LISTENER_DURATION;
|
|
||||||
long expectedOverhead = COUNT + FUDGE;
|
|
||||||
|
|
||||||
ExecutorService userthreads = newCachedThreadPool();
|
|
||||||
try {
|
|
||||||
ExecutorService chainExecutor = userthreads;
|
|
||||||
ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor();
|
|
||||||
|
|
||||||
checkThresholdsUsingConcurrentUtilsCompose(expectedMin, expectedMax, expectedOverhead, callableExecutor,
|
|
||||||
chainExecutor, listenerExecutor);
|
|
||||||
} finally {
|
|
||||||
userthreads.shutdownNow();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(enabled = false)
|
|
||||||
public void testConcurrentUtilsCompose3() throws InterruptedException, ExecutionException {
|
|
||||||
long expectedMax = CALLABLE_DURATION + (LISTENER_DURATION * COUNT);
|
|
||||||
long expectedMin = CALLABLE_DURATION + LISTENER_DURATION;
|
|
||||||
long expectedOverhead = COUNT + FUDGE;
|
|
||||||
|
|
||||||
ExecutorService userthreads = newCachedThreadPool();
|
|
||||||
try {
|
|
||||||
ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor();
|
|
||||||
ExecutorService listenerExecutor = userthreads;
|
|
||||||
|
|
||||||
checkThresholdsUsingConcurrentUtilsCompose(expectedMin, expectedMax, expectedOverhead, callableExecutor,
|
|
||||||
chainExecutor, listenerExecutor);
|
|
||||||
} finally {
|
|
||||||
userthreads.shutdownNow();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(enabled = false)
|
|
||||||
public void testConcurrentUtilsCompose4() throws InterruptedException, ExecutionException {
|
|
||||||
|
|
||||||
long expectedMax = CALLABLE_DURATION + (LISTENER_DURATION * COUNT);
|
|
||||||
long expectedMin = CALLABLE_DURATION;
|
|
||||||
long expectedOverhead = COUNT + FUDGE;
|
|
||||||
|
|
||||||
ExecutorService userthreads = newCachedThreadPool();
|
|
||||||
try {
|
|
||||||
ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor();
|
|
||||||
ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor();
|
|
||||||
|
|
||||||
checkThresholdsUsingConcurrentUtilsCompose(expectedMin, expectedMax, expectedOverhead, callableExecutor,
|
|
||||||
chainExecutor, listenerExecutor);
|
|
||||||
} finally {
|
|
||||||
userthreads.shutdownNow();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,162 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
|
|
||||||
*
|
|
||||||
* ====================================================================
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
* ====================================================================
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.jclouds.concurrent;
|
|
||||||
|
|
||||||
import static com.google.common.base.Throwables.propagate;
|
|
||||||
import static com.google.common.collect.Maps.newHashMap;
|
|
||||||
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
|
||||||
import com.google.common.collect.Iterables;
|
|
||||||
import com.google.common.collect.Sets;
|
|
||||||
import com.google.common.util.concurrent.Futures;
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tests behavior of ConcurrentUtils
|
|
||||||
*
|
|
||||||
* @author Adrian Cole
|
|
||||||
*/
|
|
||||||
public class FuturesTestingUtils {
|
|
||||||
public static final int FUDGE = 5;
|
|
||||||
public static final int COUNT = 100;
|
|
||||||
public static final int CALLABLE_DURATION = 50;
|
|
||||||
public static final int LISTENER_DURATION = 100;
|
|
||||||
|
|
||||||
public static void checkThresholdsUsingFuturesCompose(long expectedMin, long expectedMax, long expectedOverhead,
|
|
||||||
ExecutorService callableExecutor, ExecutorService chainExecutor, final ExecutorService listenerExecutor) {
|
|
||||||
long start = System.currentTimeMillis();
|
|
||||||
Map<String, Future<Long>> responses = newHashMap();
|
|
||||||
for (int i = 0; i < COUNT; i++)
|
|
||||||
responses.put(i + "", Futures.compose(createFuture(callableExecutor, chainExecutor),
|
|
||||||
new Function<Long, Long>() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Long apply(Long from) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(LISTENER_DURATION);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
propagate(e);
|
|
||||||
}
|
|
||||||
return System.currentTimeMillis();
|
|
||||||
}
|
|
||||||
|
|
||||||
}, listenerExecutor));
|
|
||||||
checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void checkThresholdsUsingConcurrentUtilsCompose(long expectedMin, long expectedMax,
|
|
||||||
long expectedOverhead, ExecutorService callableExecutor, ExecutorService chainExecutor,
|
|
||||||
final ExecutorService listenerExecutor) {
|
|
||||||
long start = System.currentTimeMillis();
|
|
||||||
Map<String, Future<Long>> responses = newHashMap();
|
|
||||||
for (int i = 0; i < COUNT; i++)
|
|
||||||
responses.put(i + "", org.jclouds.concurrent.Futures.compose(createFuture(callableExecutor, chainExecutor), new Function<Long, Long>() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Long apply(Long from) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(LISTENER_DURATION);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
propagate(e);
|
|
||||||
}
|
|
||||||
return System.currentTimeMillis();
|
|
||||||
}
|
|
||||||
|
|
||||||
}, listenerExecutor));
|
|
||||||
checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Map<String, Future<Long>> runCallables(ExecutorService callableExecutor, ExecutorService chainExecutor) {
|
|
||||||
Map<String, Future<Long>> responses = newHashMap();
|
|
||||||
for (int i = 0; i < COUNT; i++)
|
|
||||||
responses.put(i + "", createFuture(callableExecutor, chainExecutor));
|
|
||||||
return responses;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static ListenableFuture<Long> createFuture(ExecutorService callableExecutor, ExecutorService chainExecutor) {
|
|
||||||
return org.jclouds.concurrent.Futures.makeListenable(callableExecutor.submit(new Callable<Long>() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Long call() throws Exception {
|
|
||||||
Thread.sleep(CALLABLE_DURATION);
|
|
||||||
return System.currentTimeMillis();
|
|
||||||
}
|
|
||||||
|
|
||||||
}), chainExecutor);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static long getMaxIn(Map<String, Future<Long>> responses) {
|
|
||||||
Iterable<Long> collection = Iterables.transform(responses.values(), new Function<Future<Long>, Long>() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Long apply(Future<Long> from) {
|
|
||||||
try {
|
|
||||||
return from.get();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
});
|
|
||||||
long time = Collections.max(Sets.newHashSet(collection));
|
|
||||||
return time;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static long getMinIn(Map<String, Future<Long>> responses) {
|
|
||||||
Iterable<Long> collection = Iterables.transform(responses.values(), new Function<Future<Long>, Long>() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Long apply(Future<Long> from) {
|
|
||||||
try {
|
|
||||||
return from.get();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
});
|
|
||||||
long time = Collections.min(Sets.newHashSet(collection));
|
|
||||||
return time;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void checkTimeThresholds(long expectedMin, long expectedMax, long expectedOverhead, long start,
|
|
||||||
Map<String, Future<Long>> responses) {
|
|
||||||
long time = getMaxIn(responses) - start;
|
|
||||||
assert time >= expectedMax && time < expectedMax + expectedOverhead : String.format("expectedMax %d, max %d",
|
|
||||||
expectedMax, time);
|
|
||||||
|
|
||||||
time = getMinIn(responses) - start;
|
|
||||||
assert time >= expectedMin && time < expectedMin + expectedOverhead : String.format("expectedMin %d, min %d",
|
|
||||||
expectedMin, time);
|
|
||||||
|
|
||||||
time = getMaxIn(responses) - start;
|
|
||||||
assert time >= expectedMax && time < expectedMax + expectedOverhead : String.format("expectedMax %d, max %d",
|
|
||||||
expectedMax, time);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -108,6 +108,7 @@ import org.jclouds.io.PayloadEnclosing;
|
||||||
import org.jclouds.io.Payloads;
|
import org.jclouds.io.Payloads;
|
||||||
import org.jclouds.logging.config.NullLoggingModule;
|
import org.jclouds.logging.config.NullLoggingModule;
|
||||||
import org.jclouds.rest.AsyncClientFactory;
|
import org.jclouds.rest.AsyncClientFactory;
|
||||||
|
import org.jclouds.rest.AuthorizationException;
|
||||||
import org.jclouds.rest.BaseRestClientTest;
|
import org.jclouds.rest.BaseRestClientTest;
|
||||||
import org.jclouds.rest.ConfiguresRestClient;
|
import org.jclouds.rest.ConfiguresRestClient;
|
||||||
import org.jclouds.rest.InvocationContext;
|
import org.jclouds.rest.InvocationContext;
|
||||||
|
@ -2179,6 +2180,10 @@ public class RestAnnotationProcessorTest extends BaseRestClientTest {
|
||||||
@Provides
|
@Provides
|
||||||
Set<String> foo();
|
Set<String> foo();
|
||||||
|
|
||||||
|
@Named("exception")
|
||||||
|
@Provides
|
||||||
|
Set<String> exception();
|
||||||
|
|
||||||
@POST
|
@POST
|
||||||
@Path("/")
|
@Path("/")
|
||||||
void oneForm(@PathParam("bucket") String path);
|
void oneForm(@PathParam("bucket") String path);
|
||||||
|
@ -2197,6 +2202,12 @@ public class RestAnnotationProcessorTest extends BaseRestClientTest {
|
||||||
assertEquals(set, ImmutableSet.of("bar"));
|
assertEquals(set, ImmutableSet.of("bar"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(expectedExceptions = AuthorizationException.class)
|
||||||
|
public void testProvidesWithGenericQualifiedAuthorizationException() throws SecurityException,
|
||||||
|
NoSuchMethodException, UnsupportedEncodingException {
|
||||||
|
injector.getInstance(AsyncClientFactory.class).create(TestClassForm.class).exception();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBuildOneClassForm() throws SecurityException, NoSuchMethodException, UnsupportedEncodingException {
|
public void testBuildOneClassForm() throws SecurityException, NoSuchMethodException, UnsupportedEncodingException {
|
||||||
Method oneForm = TestClassForm.class.getMethod("oneForm", String.class);
|
Method oneForm = TestClassForm.class.getMethod("oneForm", String.class);
|
||||||
|
@ -2252,6 +2263,13 @@ public class RestAnnotationProcessorTest extends BaseRestClientTest {
|
||||||
URI.create("http://localhost:1111"));
|
URI.create("http://localhost:1111"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
@Provides
|
||||||
|
@Named("exception")
|
||||||
|
Set<String> exception() {
|
||||||
|
throw new AuthorizationException();
|
||||||
|
}
|
||||||
|
|
||||||
}));
|
}));
|
||||||
|
|
||||||
injector = createContextBuilder(contextSpec).buildInjector();
|
injector = createContextBuilder(contextSpec).buildInjector();
|
||||||
|
|
|
@ -82,8 +82,7 @@ public class Throwables2Test {
|
||||||
Exception e = new TestException();
|
Exception e = new TestException();
|
||||||
assertEquals(returnFirstExceptionIfInListOrThrowStandardExceptionOrCause(new Class[] { TestException.class }, e),
|
assertEquals(returnFirstExceptionIfInListOrThrowStandardExceptionOrCause(new Class[] { TestException.class }, e),
|
||||||
e);
|
e);
|
||||||
assertEquals(
|
assertEquals(returnFirstExceptionIfInListOrThrowStandardExceptionOrCause(new Class[] { TestException.class },
|
||||||
returnFirstExceptionIfInListOrThrowStandardExceptionOrCause(new Class[] { TestException.class },
|
|
||||||
new RuntimeException(e)), e);
|
new RuntimeException(e)), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,6 +116,13 @@ public class Throwables2Test {
|
||||||
returnFirstExceptionIfInListOrThrowStandardExceptionOrCause(new Class[] {}, new RuntimeException(e));
|
returnFirstExceptionIfInListOrThrowStandardExceptionOrCause(new Class[] {}, new RuntimeException(e));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(expectedExceptions = AuthorizationException.class)
|
||||||
|
public void testPropagateProvisionExceptionAuthorizationException() throws Exception {
|
||||||
|
Exception e = new AuthorizationException();
|
||||||
|
returnFirstExceptionIfInListOrThrowStandardExceptionOrCause(new Class[] {}, new ProvisionException(ImmutableSet.of(new Message(
|
||||||
|
ImmutableList.of(), "Error in custom provider",e))));
|
||||||
|
}
|
||||||
|
|
||||||
@Test(expectedExceptions = InsufficientResourcesException.class)
|
@Test(expectedExceptions = InsufficientResourcesException.class)
|
||||||
public void testPropagateStandardExceptionInsufficientResourcesException() throws Exception {
|
public void testPropagateStandardExceptionInsufficientResourcesException() throws Exception {
|
||||||
Exception e = new InsufficientResourcesException();
|
Exception e = new InsufficientResourcesException();
|
||||||
|
@ -170,6 +176,7 @@ public class Throwables2Test {
|
||||||
Exception e = new HttpResponseException("goo", createNiceMock(HttpCommand.class), null);
|
Exception e = new HttpResponseException("goo", createNiceMock(HttpCommand.class), null);
|
||||||
returnFirstExceptionIfInListOrThrowStandardExceptionOrCause(new Class[] {}, new RuntimeException(e));
|
returnFirstExceptionIfInListOrThrowStandardExceptionOrCause(new Class[] {}, new RuntimeException(e));
|
||||||
}
|
}
|
||||||
|
|
||||||
static class TestException extends Exception {
|
static class TestException extends Exception {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue