AWS S3 sequential multipart upload strategy, newly added files.

This commit is contained in:
Tibor Kiss 2011-03-04 23:29:39 +01:00
parent ed27cbc7c0
commit 9bbfcad144
7 changed files with 613 additions and 0 deletions

View File

@ -0,0 +1,126 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.io.payloads;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.stream.ChunkedFile;
/**
*
*
*
*
* @author Tibor Kiss
*/
public class ChunkedFileInputStream extends InputStream {
private static final int CHUNK_SIZE = 8192;
private ChunkedFile chunks;
private ChannelBuffer chunk;
private IOException ex;
public ChunkedFileInputStream(String filename, long offset, long length) {
this(new File(filename), offset, length);
}
public ChunkedFileInputStream(File file, long offset, long length) {
try {
this.chunks = new ChunkedFile(new RandomAccessFile(file, "r"), offset, length, CHUNK_SIZE);
} catch (IOException ex) {
this.ex = ex;
}
}
private ChannelBuffer getChunk() throws Exception {
if (ex != null) {
throw ex;
}
if (chunk == null) {
chunk = ChannelBuffer.class.cast(chunks.nextChunk());
}
if (chunk != null) {
if (chunk.readableBytes() < 1 && chunks.hasNextChunk()) {
chunk = ChannelBuffer.class.cast(chunks.nextChunk());
if (chunk.readableBytes() < 1) {
return null;
}
}
} else {
return null;
}
return chunk;
}
@Override
public int read() throws IOException {
try {
ChannelBuffer chunk = getChunk();
if (chunk == null)
return -1;
if (chunk.readableBytes() < 1)
return -1;
int readIndex = chunk.readerIndex();
byte abyte = chunk.getByte(readIndex);
chunk.readerIndex(readIndex + 1);
return (int)abyte;
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
try {
ChannelBuffer chunk = getChunk();
if (chunk == null)
return -1;
int readable = chunk.readableBytes();
if (readable < 1)
return -1;
if (readable > len) {
readable = len;
}
int readIndex = chunk.readerIndex();
chunk.getBytes(readIndex, b, off, readable);
chunk.readerIndex(readIndex + readable);
return readable;
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public void close() throws IOException {
try {
chunks.close();
} catch (Exception e) {
throw new IOException(e);
}
}
}

View File

@ -0,0 +1,50 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.io.payloads;
import java.io.File;
import java.io.InputStream;
public class ChunkedFilePayload extends FilePayload {
private int part;
private long chunkOffset;
private long chunkSize;
public ChunkedFilePayload(File content) {
this(content, 1, 0, content.length());
}
public ChunkedFilePayload(File content, int part, long chunkOffset, long chunkSize) {
super(content);
this.part = part;
this.chunkOffset = chunkOffset;
this.chunkSize = chunkSize;
}
public int getPart() {
return part;
}
@Override
public InputStream getInput() {
return new ChunkedFileInputStream(getRawContent(), chunkOffset, chunkSize);
}
}

View File

@ -0,0 +1,67 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.aws.s3.blobstore;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Provider;
import org.jclouds.Constants;
import org.jclouds.aws.s3.AWSS3AsyncClient;
import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata;
import org.jclouds.blobstore.util.BlobUtils;
import org.jclouds.collect.Memoized;
import org.jclouds.domain.Location;
import org.jclouds.s3.blobstore.S3AsyncBlobStore;
import org.jclouds.s3.blobstore.functions.BlobToObject;
import org.jclouds.s3.blobstore.functions.BucketToResourceList;
import org.jclouds.s3.blobstore.functions.BucketToResourceMetadata;
import org.jclouds.s3.blobstore.functions.ContainerToBucketListOptions;
import org.jclouds.s3.blobstore.functions.ObjectToBlob;
import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata;
import com.google.common.base.Supplier;
/**
*
* @author Tibor Kiss
*/
public class AWSS3AsyncBlobStore extends S3AsyncBlobStore {
@Inject
public AWSS3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, AWSS3AsyncClient async, AWSS3Client sync,
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
super(context, blobUtils, service, defaultLocation, locations, async, sync, bucket2ResourceMd,
container2BucketListOptions, bucket2ResourceList, object2Blob, blob2ObjectGetOptions,
blob2Object, object2BlobMd, fetchBlobMetadataProvider);
}
}

View File

@ -0,0 +1,76 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.aws.s3.blobstore;
import java.util.Set;
import javax.inject.Inject;
import javax.inject.Provider;
import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata;
import org.jclouds.blobstore.util.BlobUtils;
import org.jclouds.collect.Memoized;
import org.jclouds.domain.Location;
import org.jclouds.s3.blobstore.S3BlobStore;
import org.jclouds.s3.blobstore.functions.BlobToObject;
import org.jclouds.s3.blobstore.functions.BucketToResourceList;
import org.jclouds.s3.blobstore.functions.BucketToResourceMetadata;
import org.jclouds.s3.blobstore.functions.ContainerToBucketListOptions;
import org.jclouds.s3.blobstore.functions.ObjectToBlob;
import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata;
import com.google.common.base.Supplier;
/**
* Proived AWS S3 specific extensions.
*
* @author Tibor Kiss
*/
public class AWSS3BlobStore extends S3BlobStore {
private MultipartUploadStrategy multipartUploadStrategy;
@Inject
AWSS3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, AWSS3Client sync,
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
super(context, blobUtils, defaultLocation, locations, sync, bucket2ResourceMd, container2BucketListOptions,
bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object, object2BlobMd,
fetchBlobMetadataProvider);
multipartUploadStrategy = new SequentialMultipartUploadStrategy(this);
}
/* (non-Javadoc)
* @see org.jclouds.blobstore.internal.BaseBlobStore#putBlobMultipart(java.lang.String, org.jclouds.blobstore.domain.Blob)
*/
@Override
public String putBlobMultipart(String container, Blob blob) {
return multipartUploadStrategy.execute(container, blob);
}
}

View File

@ -0,0 +1,59 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.aws.s3.blobstore.config;
import org.jclouds.aws.s3.AWSS3AsyncClient;
import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.AWSS3AsyncBlobStore;
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
import org.jclouds.blobstore.AsyncBlobStore;
import org.jclouds.blobstore.BlobRequestSigner;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.attr.ConsistencyModel;
import org.jclouds.blobstore.config.BlobStoreMapModule;
import org.jclouds.blobstore.internal.BlobStoreContextImpl;
import org.jclouds.location.config.RegionsLocationModule;
import org.jclouds.s3.blobstore.S3BlobRequestSigner;
import org.jclouds.s3.blobstore.config.S3BlobStoreContextModule;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
/**
*
*
* @author Tibor Kiss
*/
public class AWSS3BlobStoreContextModule extends S3BlobStoreContextModule {
@Override
protected void configure() {
install(new BlobStoreMapModule());
install(new RegionsLocationModule());
bind(ConsistencyModel.class).toInstance(ConsistencyModel.EVENTUAL);
bind(AsyncBlobStore.class).to(AWSS3AsyncBlobStore.class).in(Scopes.SINGLETON);
bind(BlobStore.class).to(AWSS3BlobStore.class).in(Scopes.SINGLETON);
bind(BlobStoreContext.class).to(new TypeLiteral<BlobStoreContextImpl<AWSS3Client, AWSS3AsyncClient>>() {
}).in(Scopes.SINGLETON);
bind(BlobRequestSigner.class).to(S3BlobRequestSigner.class);
bindBucketLocationStrategy();
}
}

View File

@ -0,0 +1,50 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.aws.s3.blobstore.strategy;
import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy;
import org.jclouds.blobstore.domain.Blob;
import com.google.inject.ImplementedBy;
/**
* @see <a href="http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?qfacts.html"
*
* @author Tibor Kiss
*/
@ImplementedBy(SequentialMultipartUploadStrategy.class)
public interface MultipartUploadStrategy {
/* Maximum number of parts per upload */
public static final int MAX_NUMBER_OF_PARTS = 10000;
/* Maximum number of parts returned for a list parts request */
public static final int MAX_LIST_PARTS_RETURNED = 1000;
/* Maximum number of multipart uploads returned in a list multipart uploads request */
public static final int MAX_LIST_MPU_RETURNED = 1000;
/*
* part size 5 MB to 5 GB, last part can be < 5 MB
*/
public static final long MIN_PART_SIZE = 5242880L;
public static final long MAX_PART_SIZE = 5368709120L;
String execute(String container, Blob blob);
}

View File

@ -0,0 +1,185 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.aws.s3.blobstore.strategy.internal;
import static org.jclouds.io.Payloads.newChunkedFilePayload;
import java.io.File;
import java.util.Map;
import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.blobstore.KeyNotFoundException;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.io.Payload;
import org.jclouds.io.payloads.FilePayload;
import org.jclouds.s3.domain.ObjectMetadataBuilder;
import com.google.common.collect.Maps;
/**
* Provides a sequential multipart upload strategy.
*
* The file partitioning algorithm:
*
* The default partition size we choose is 32mb.
* A multiple of this default partition size is used.
* The number of parts first grows to a chosen magnitude (for example 100 parts),
* then it grows the partition size instead of number of partitions.
* When we reached the maximum part size, then again it starts to grow the number of partitions.
*
* @author Tibor Kiss
*/
public class SequentialMultipartUploadStrategy implements
MultipartUploadStrategy {
private final long DEFAULT_PART_SIZE = 33554432; // 32mb
private final int MAGNITUDE_BASE = 100;
private final AWSS3BlobStore ablobstore;
// calculated only once, but not from the constructor
private volatile long parts; // required number of parts with chunkSize
private volatile long chunkSize;
private volatile long remaining; // number of bytes remained for the last part
// sequentially updated values
private volatile int part;
private volatile long chunkOffset;
private volatile long copied;
public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore) {
this.ablobstore = ablobstore;
}
protected long calculateChunkSize(long length) {
long unitPartSize = DEFAULT_PART_SIZE; // first try with default part size
long parts = length / unitPartSize;
long partSize = unitPartSize;
int magnitude = (int) (parts / MAGNITUDE_BASE);
if (magnitude > 0) {
partSize = magnitude * unitPartSize;
if (partSize > MAX_PART_SIZE) {
partSize = MAX_PART_SIZE;
unitPartSize = MAX_PART_SIZE;
}
parts = length / partSize;
}
if (parts > MAX_NUMBER_OF_PARTS) { // if splits in too many parts or
// cannot be split
unitPartSize = MIN_PART_SIZE; // take the minimum part size
parts = length / unitPartSize;
}
if (parts > MAX_NUMBER_OF_PARTS) { // if still splits in too many parts
parts = MAX_NUMBER_OF_PARTS - 1; // limit them. do not care about not
// covering
}
long remainder = length % unitPartSize;
if (remainder == 0 && parts > 0) {
parts -= 1;
}
this.chunkSize = partSize;
this.parts = parts;
this.remaining = length - partSize * parts;
System.out.println(" " + length + " bytes partitioned in " + parts
+ " parts of part size: " + chunkSize + ", remaining: "
+ remaining + (remaining > MAX_PART_SIZE ? " overflow!" : ""));
return this.chunkSize;
}
protected long getParts() {
return parts;
}
protected int getNextPart() {
return ++part;
}
protected void addCopied(long copied) {
this.copied += copied;
}
protected long getNextChunkOffset() {
long next = chunkOffset;
chunkOffset += getChunkSize();
return next;
}
protected long getChunkSize() {
return chunkSize;
}
protected long getRemaining() {
return remaining;
}
private String prepareUploadPart(AWSS3Client client, String container, String key, String uploadId,
int part, File file, long chunkOffset, long chunkSize) {
Payload chunkedPart = newChunkedFilePayload(file, part, chunkOffset, chunkSize);
chunkedPart.getContentMetadata().setContentLength(chunkSize);
//chukedPayload.getContentMetadata().setContentMD5(???);
String eTag = null;
try {
eTag = client.uploadPart(container, key, part, uploadId, chunkedPart);
} catch (KeyNotFoundException e) {
// note that because of eventual consistency, the upload id may not be present yet
// we may wish to add this condition to the retry handler
// we may also choose to implement ListParts and wait for the uploadId to become
// available there.
eTag = client.uploadPart(container, key, part, uploadId, chunkedPart);
}
return eTag;
}
@Override
public String execute(String container, Blob blob) {
Payload payload = blob.getPayload();
if (payload instanceof FilePayload) {
String key = blob.getMetadata().getName();
File file = FilePayload.class.cast(payload).getRawContent();
calculateChunkSize(file.length());
long parts = getParts();
if (parts > 0) {
AWSS3Client client = (AWSS3Client) ablobstore.getContext().getProviderSpecificContext().getApi();
String uploadId = client.initiateMultipartUpload(container,
ObjectMetadataBuilder.create().key(key).build()); // TODO md5
Map<Integer, String> etags = Maps.newHashMap();
int part;
while ((part = getNextPart()) <= getParts()) {
String eTag = prepareUploadPart(client, container, key, uploadId, part, file, getNextChunkOffset(), chunkSize);
etags.put(new Integer(part), eTag);
}
long remaining = getRemaining();
if (remaining > 0) {
String eTag = prepareUploadPart(client, container, key, uploadId, part, file, getNextChunkOffset(), remaining);
etags.put(new Integer(part), eTag);
}
return client.completeMultipartUpload(container, key, uploadId, etags);
} else {
return ablobstore.putBlob(container, blob);
}
} else {
return ablobstore.putBlob(container, blob);
}
}
}