added parallel file upload test to blobstore

This commit is contained in:
Adrian Cole 2011-04-07 12:26:47 -07:00
parent ba40e8a7b5
commit 25eb1581ea
3 changed files with 97 additions and 2 deletions

View File

@ -39,10 +39,13 @@ import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPInputStream;
import javax.ws.rs.core.MediaType;
@ -61,6 +64,7 @@ import org.jclouds.encryption.internal.JCECrypto;
import org.jclouds.http.BaseJettyTest;
import org.jclouds.http.HttpResponseException;
import org.jclouds.io.InputSuppliers;
import org.jclouds.io.Payload;
import org.jclouds.io.Payloads;
import org.jclouds.io.WriteTo;
import org.jclouds.io.payloads.StreamingPayload;
@ -78,6 +82,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
/**
@ -111,6 +116,59 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
return temp;
}
/**
* Attempt to capture the issue detailed in
* http://groups.google.com/group/jclouds/browse_thread/thread/4a7c8d58530b287f
*/
@Test(groups = { "integration", "live" })
public void testPutFileParallel() throws InterruptedException, IOException {
File payloadFile = File.createTempFile("testPutFileParallel", "png");
Files.copy(InputSuppliers.of(getClass().getResource("/testimg.png").openStream()), payloadFile);
payloadFile.deleteOnExit();
final Payload testPayload = Payloads.newFilePayload(payloadFile);
final byte[] md5 = CryptoStreams.md5(testPayload);
testPayload.getContentMetadata().setContentType("image/png");
final AtomicInteger blobCount = new AtomicInteger();
final String container = getContainerName();
try {
Map<Integer, Future<?>> responses = Maps.newHashMap();
for (int i = 0; i < 10; i++) {
responses.put(i, this.exec.submit(new Callable<Void>() {
@SuppressWarnings("deprecation")
@Override
public Void call() throws Exception {
String name = blobCount.incrementAndGet() + "";
Blob blob = context.getBlobStore().newBlob(name);
blob.setPayload(testPayload);
context.getBlobStore().putBlob(container, blob);
assertConsistencyAwareBlobExists(container, name);
blob = context.getBlobStore().getBlob(container, name);
assert Arrays.equals(CryptoStreams.md5(blob.getPayload()), md5) : String.format(
"md5 didn't match on %s/%s", container, name);
context.getBlobStore().removeBlob(container, name);
assertConsistencyAwareBlobDoesntExist(container, name);
return null;
}
}));
}
Map<Integer, Exception> exceptions = awaitCompletion(responses, exec, 30000l, Logger.CONSOLE,
"putFileParallel");
assert exceptions.size() == 0 : exceptions;
} finally {
returnContainer(container);
}
}
@Test(groups = { "integration", "live" })
public void testBigFileGets() throws InterruptedException, IOException {
final String expectedContentDisposition = "attachment; filename=constit.txt";

View File

@ -26,8 +26,8 @@ import static org.testng.Assert.assertEquals;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
@ -54,6 +54,7 @@ import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.inject.Module;
@ -288,7 +289,29 @@ public class BaseBlobStoreIntegrationTest {
public void run() {
try {
assert context.getBlobStore().countBlobs(containerName) == count : String.format(
"expected only %d values in %s: %s", count, containerName, Sets.newHashSet(Iterables.transform(
"expected only %d values in %s: %s", count, containerName, ImmutableSet.copyOf(Iterables
.transform(context.getBlobStore().list(containerName),
new Function<StorageMetadata, String>() {
public String apply(StorageMetadata from) {
return from.getName();
}
})));
} catch (Exception e) {
Throwables.propagateIfPossible(e);
}
}
});
}
protected void assertConsistencyAwareBlobExists(final String containerName, final String name)
throws InterruptedException {
assertConsistencyAware(new Runnable() {
public void run() {
try {
assert context.getBlobStore().blobExists(containerName, name) : String.format(
"could not find %s in %s: %s", name, containerName, ImmutableSet.copyOf(Iterables.transform(
context.getBlobStore().list(containerName), new Function<StorageMetadata, String>() {
public String apply(StorageMetadata from) {
@ -303,6 +326,20 @@ public class BaseBlobStoreIntegrationTest {
});
}
protected void assertConsistencyAwareBlobDoesntExist(final String containerName, final String name)
throws InterruptedException {
assertConsistencyAware(new Runnable() {
public void run() {
try {
assert !context.getBlobStore().blobExists(containerName, name) : String.format("found %s in %s", name,
containerName);
} catch (Exception e) {
Throwables.propagateIfPossible(e);
}
}
});
}
public String getContainerName() throws InterruptedException {
String containerName = containerNames.poll(30, TimeUnit.SECONDS);
assert containerName != null : "unable to get a container for the test";

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB