Issue 430: abortMultipartUpload when handling uploadPart errors, added

a unit test for this abortMultipartUpload.
This commit is contained in:
Tibor Kiss 2011-03-17 11:14:28 +01:00
parent 4ec124d264
commit 70ec28146e
2 changed files with 112 additions and 18 deletions

View File

@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.TimeoutException;
import javax.annotation.Resource;
import javax.inject.Inject;
@ -38,6 +39,7 @@ import org.jclouds.io.Payload;
import org.jclouds.io.PayloadSlicer;
import org.jclouds.logging.Logger;
import org.jclouds.s3.domain.ObjectMetadataBuilder;
import org.jclouds.util.Throwables2;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
@ -157,7 +159,7 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg
Payload chunkedPart) {
String eTag = null;
try {
eTag = client.uploadPart(container, key, part, uploadId, chunkedPart);
eTag = client.uploadPart(container, key, part, uploadId, chunkedPart);
} catch (KeyNotFoundException e) {
// note that because of eventual consistency, the upload id may not be present yet
// we may wish to add this condition to the retry handler
@ -175,23 +177,35 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg
calculateChunkSize(blob.getPayload().getContentMetadata().getContentLength());
long parts = getParts();
if (parts > 0) {
AWSS3Client client = (AWSS3Client) ablobstore.getContext().getProviderSpecificContext().getApi();
String uploadId = client.initiateMultipartUpload(container, ObjectMetadataBuilder.create().key(key).build()); // TODO
// md5
SortedMap<Integer, String> etags = Maps.newTreeMap();
int part;
while ((part = getNextPart()) <= getParts()) {
String eTag = prepareUploadPart(client, container, key, uploadId, part,
slicer.slice(blob.getPayload(), getNextChunkOffset(), chunkSize));
etags.put(new Integer(part), eTag);
AWSS3Client client = (AWSS3Client) ablobstore.getContext()
.getProviderSpecificContext().getApi();
String uploadId = client.initiateMultipartUpload(container,
ObjectMetadataBuilder.create().key(key).build()); // TODO md5
try {
SortedMap<Integer, String> etags = Maps.newTreeMap();
int part;
while ((part = getNextPart()) <= getParts()) {
String eTag = prepareUploadPart(client, container, key,
uploadId, part, slicer.slice(blob.getPayload(),
getNextChunkOffset(), chunkSize));
etags.put(new Integer(part), eTag);
}
long remaining = getRemaining();
if (remaining > 0) {
String eTag = prepareUploadPart(client, container, key,
uploadId, part, slicer.slice(blob.getPayload(),
getNextChunkOffset(), remaining));
etags.put(new Integer(part), eTag);
}
return client.completeMultipartUpload(container, key, uploadId, etags);
} catch (Exception ex) {
RuntimeException rtex = Throwables2.getFirstThrowableOfType(ex, RuntimeException.class);
if (rtex == null) {
rtex = new RuntimeException(ex);
}
client.abortMultipartUpload(container, key, uploadId);
throw rtex;
}
long remaining = getRemaining();
if (remaining > 0) {
String eTag = prepareUploadPart(client, container, key, uploadId, part,
slicer.slice(blob.getPayload(), getNextChunkOffset(), remaining));
etags.put(new Integer(part), eTag);
}
return client.completeMultipartUpload(container, key, uploadId, etags);
} else {
return ablobstore.putBlob(container, blob);
}

View File

@ -20,17 +20,24 @@
package org.jclouds.aws.s3.blobstore.strategy.internal;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.classextension.EasyMock.createMock;
import static org.easymock.classextension.EasyMock.replay;
import static org.easymock.classextension.EasyMock.verify;
import static org.testng.Assert.fail;
import java.util.SortedMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.easymock.EasyMock;
import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.MutableBlobMetadata;
import org.jclouds.concurrent.Timeout;
import org.jclouds.io.MutableContentMetadata;
import org.jclouds.io.Payload;
import org.jclouds.io.PayloadSlicer;
@ -38,6 +45,7 @@ import org.jclouds.rest.RestContext;
import org.jclouds.rest.internal.RestContextImpl;
import org.jclouds.s3.domain.ObjectMetadata;
import org.jclouds.s3.domain.ObjectMetadataBuilder;
import org.jclouds.util.Throwables2;
import org.testng.annotations.Test;
import com.google.common.collect.Maps;
@ -49,7 +57,7 @@ import com.google.common.collect.Maps;
*/
@Test(groups = "unit")
public class SequentialMultipartUploadStrategyTest {
@Test
public void testWithTwoParts() {
AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class);
@ -112,4 +120,76 @@ public class SequentialMultipartUploadStrategyTest {
verify(client);
verify(ometa);
}
@Test
public void testWithTimeout() {
AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class);
PayloadSlicer slicer = createMock(PayloadSlicer.class);
String container = "container";
String key = "mpu-test";
Blob blob = createMock(Blob.class);
MutableBlobMetadata blobMeta = createMock(MutableBlobMetadata.class);
Payload payload = createMock(Payload.class);
MutableContentMetadata contentMeta = createMock(MutableContentMetadata.class);
BlobStoreContext context = createMock(BlobStoreContext.class);
@SuppressWarnings("unchecked")
RestContext<Object, Object> psc = createMock(RestContextImpl.class);
AWSS3Client client = createMock(AWSS3Client.class);
ObjectMetadata ometa = createMock(ObjectMetadata.class);
String uploadId = "uploadId";
long chunkSize = SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE;
long remaining = 100L;
SortedMap<Integer, String> etags = Maps.newTreeMap();
etags.put(new Integer(1), "eTag1");
etags.put(new Integer(2), "eTag2");
expect(blob.getMetadata()).andReturn(blobMeta).atLeastOnce();
expect(blobMeta.getName()).andReturn(key).atLeastOnce();
expect(blob.getPayload()).andReturn(payload).atLeastOnce();
expect(payload.getContentMetadata()).andReturn(contentMeta).atLeastOnce();
expect(contentMeta.getContentLength()).andReturn(new Long(chunkSize + remaining));
expect(ablobStore.getContext()).andReturn(context).atLeastOnce();
expect(context.getProviderSpecificContext()).andReturn(psc).atLeastOnce();
expect(psc.getApi()).andReturn(client).atLeastOnce();
expect(client.initiateMultipartUpload(container, new ObjectMetadataBuilder().key(key).build())).andReturn("uploadId").atLeastOnce();
expect(slicer.slice(payload, 0, chunkSize)).andReturn(payload).atLeastOnce();
expect(client.uploadPart(container, key, 1, uploadId, payload)).andReturn("eTag1").atLeastOnce();
expect(slicer.slice(payload, chunkSize, remaining)).andReturn(payload).atLeastOnce();
expect(client.uploadPart(container, key, 2, uploadId, payload)).andThrow(new RuntimeException(new TimeoutException()));
client.abortMultipartUpload(container, key, uploadId);
expectLastCall().atLeastOnce();
replay(ablobStore);
replay(slicer);
replay(blob);
replay(blobMeta);
replay(payload);
replay(contentMeta);
replay(context);
replay(psc);
replay(client);
replay(ometa);
SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer);
try {
strategy.execute(container, blob);
fail("Should throw RuntimeException with TimeoutException cause!");
} catch (RuntimeException rtex) {
TimeoutException timeout = Throwables2.getFirstThrowableOfType(rtex, TimeoutException.class);
if (timeout == null) {
throw rtex;
}
}
verify(ablobStore);
verify(slicer);
verify(blob);
verify(blobMeta);
verify(payload);
verify(contentMeta);
verify(context);
verify(psc);
verify(client);
verify(ometa);
}
}