From 9bbfcad144b013134e4cff5a40e40039ad12a087 Mon Sep 17 00:00:00 2001 From: Tibor Kiss Date: Fri, 4 Mar 2011 23:29:39 +0100 Subject: [PATCH] AWS S3 sequential multipart upload strategy, newly added files. --- .../io/payloads/ChunkedFileInputStream.java | 126 ++++++++++++ .../io/payloads/ChunkedFilePayload.java | 50 +++++ .../aws/s3/blobstore/AWSS3AsyncBlobStore.java | 67 +++++++ .../aws/s3/blobstore/AWSS3BlobStore.java | 76 +++++++ .../config/AWSS3BlobStoreContextModule.java | 59 ++++++ .../strategy/MultipartUploadStrategy.java | 50 +++++ .../SequentialMultipartUploadStrategy.java | 185 ++++++++++++++++++ 7 files changed, 613 insertions(+) create mode 100644 core/src/main/java/org/jclouds/io/payloads/ChunkedFileInputStream.java create mode 100644 core/src/main/java/org/jclouds/io/payloads/ChunkedFilePayload.java create mode 100644 providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java create mode 100644 providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java create mode 100644 providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java create mode 100644 providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java create mode 100644 providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java diff --git a/core/src/main/java/org/jclouds/io/payloads/ChunkedFileInputStream.java b/core/src/main/java/org/jclouds/io/payloads/ChunkedFileInputStream.java new file mode 100644 index 0000000000..18649dbb4f --- /dev/null +++ b/core/src/main/java/org/jclouds/io/payloads/ChunkedFileInputStream.java @@ -0,0 +1,126 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.io.payloads; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.handler.stream.ChunkedFile; + +/** + * + * + * + * + * @author Tibor Kiss + */ +public class ChunkedFileInputStream extends InputStream { + + private static final int CHUNK_SIZE = 8192; + + private ChunkedFile chunks; + private ChannelBuffer chunk; + + private IOException ex; + + public ChunkedFileInputStream(String filename, long offset, long length) { + this(new File(filename), offset, length); + } + + public ChunkedFileInputStream(File file, long offset, long length) { + try { + this.chunks = new ChunkedFile(new RandomAccessFile(file, "r"), offset, length, CHUNK_SIZE); + } catch (IOException ex) { + this.ex = ex; + } + } + + private ChannelBuffer getChunk() throws Exception { + if (ex != null) { + throw ex; + } + if (chunk == null) { + chunk = ChannelBuffer.class.cast(chunks.nextChunk()); + } + if (chunk != null) { + if (chunk.readableBytes() < 1 && chunks.hasNextChunk()) { + chunk = ChannelBuffer.class.cast(chunks.nextChunk()); + if (chunk.readableBytes() < 1) { + return null; + } + } + } else { + return null; + } + return chunk; + } + + @Override + public int read() throws IOException { + try { + ChannelBuffer chunk = getChunk(); + if (chunk == null) + return -1; + if (chunk.readableBytes() < 1) + return -1; + int readIndex = chunk.readerIndex(); + byte abyte = chunk.getByte(readIndex); + chunk.readerIndex(readIndex + 1); + return (int)abyte; + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + try { + ChannelBuffer chunk = getChunk(); + if (chunk == null) + return -1; + int readable = chunk.readableBytes(); + if (readable < 1) + return -1; + if (readable > len) { + readable = len; + } + int readIndex = chunk.readerIndex(); + chunk.getBytes(readIndex, b, off, readable); + chunk.readerIndex(readIndex + readable); + return readable; + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public void close() throws IOException { + try { + chunks.close(); + } catch (Exception e) { + throw new IOException(e); + } + } + +} diff --git a/core/src/main/java/org/jclouds/io/payloads/ChunkedFilePayload.java b/core/src/main/java/org/jclouds/io/payloads/ChunkedFilePayload.java new file mode 100644 index 0000000000..27d6983a19 --- /dev/null +++ b/core/src/main/java/org/jclouds/io/payloads/ChunkedFilePayload.java @@ -0,0 +1,50 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.io.payloads; + +import java.io.File; +import java.io.InputStream; + +public class ChunkedFilePayload extends FilePayload { + + private int part; + private long chunkOffset; + private long chunkSize; + + public ChunkedFilePayload(File content) { + this(content, 1, 0, content.length()); + } + + public ChunkedFilePayload(File content, int part, long chunkOffset, long chunkSize) { + super(content); + this.part = part; + this.chunkOffset = chunkOffset; + this.chunkSize = chunkSize; + } + + public int getPart() { + return part; + } + + @Override + public InputStream getInput() { + return new ChunkedFileInputStream(getRawContent(), chunkOffset, chunkSize); + } +} diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java new file mode 100644 index 0000000000..6b321bf4ed --- /dev/null +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java @@ -0,0 +1,67 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore; + +import java.util.Set; +import java.util.concurrent.ExecutorService; + +import javax.inject.Inject; +import javax.inject.Named; +import javax.inject.Provider; + +import org.jclouds.Constants; +import org.jclouds.aws.s3.AWSS3AsyncClient; +import org.jclouds.aws.s3.AWSS3Client; +import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.blobstore.functions.BlobToHttpGetOptions; +import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata; +import org.jclouds.blobstore.util.BlobUtils; +import org.jclouds.collect.Memoized; +import org.jclouds.domain.Location; +import org.jclouds.s3.blobstore.S3AsyncBlobStore; +import org.jclouds.s3.blobstore.functions.BlobToObject; +import org.jclouds.s3.blobstore.functions.BucketToResourceList; +import org.jclouds.s3.blobstore.functions.BucketToResourceMetadata; +import org.jclouds.s3.blobstore.functions.ContainerToBucketListOptions; +import org.jclouds.s3.blobstore.functions.ObjectToBlob; +import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata; + +import com.google.common.base.Supplier; + +/** + * + * @author Tibor Kiss + */ +public class AWSS3AsyncBlobStore extends S3AsyncBlobStore { + + @Inject + public AWSS3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, + @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier defaultLocation, + @Memoized Supplier> locations, AWSS3AsyncClient async, AWSS3Client sync, + BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, + BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, + BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, + Provider fetchBlobMetadataProvider) { + super(context, blobUtils, service, defaultLocation, locations, async, sync, bucket2ResourceMd, + container2BucketListOptions, bucket2ResourceList, object2Blob, blob2ObjectGetOptions, + blob2Object, object2BlobMd, fetchBlobMetadataProvider); + } + +} diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java new file mode 100644 index 0000000000..a2d7582e46 --- /dev/null +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java @@ -0,0 +1,76 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore; + +import java.util.Set; + +import javax.inject.Inject; +import javax.inject.Provider; + +import org.jclouds.aws.s3.AWSS3Client; +import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; +import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy; +import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.functions.BlobToHttpGetOptions; +import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata; +import org.jclouds.blobstore.util.BlobUtils; +import org.jclouds.collect.Memoized; +import org.jclouds.domain.Location; +import org.jclouds.s3.blobstore.S3BlobStore; +import org.jclouds.s3.blobstore.functions.BlobToObject; +import org.jclouds.s3.blobstore.functions.BucketToResourceList; +import org.jclouds.s3.blobstore.functions.BucketToResourceMetadata; +import org.jclouds.s3.blobstore.functions.ContainerToBucketListOptions; +import org.jclouds.s3.blobstore.functions.ObjectToBlob; +import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata; + +import com.google.common.base.Supplier; + +/** + * Proived AWS S3 specific extensions. + * + * @author Tibor Kiss + */ +public class AWSS3BlobStore extends S3BlobStore { + + private MultipartUploadStrategy multipartUploadStrategy; + + @Inject + AWSS3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier defaultLocation, + @Memoized Supplier> locations, AWSS3Client sync, + BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, + BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, + BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, + Provider fetchBlobMetadataProvider) { + super(context, blobUtils, defaultLocation, locations, sync, bucket2ResourceMd, container2BucketListOptions, + bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object, object2BlobMd, + fetchBlobMetadataProvider); + multipartUploadStrategy = new SequentialMultipartUploadStrategy(this); + } + + /* (non-Javadoc) + * @see org.jclouds.blobstore.internal.BaseBlobStore#putBlobMultipart(java.lang.String, org.jclouds.blobstore.domain.Blob) + */ + @Override + public String putBlobMultipart(String container, Blob blob) { + return multipartUploadStrategy.execute(container, blob); + } +} diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java new file mode 100644 index 0000000000..1a11c54459 --- /dev/null +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java @@ -0,0 +1,59 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore.config; + +import org.jclouds.aws.s3.AWSS3AsyncClient; +import org.jclouds.aws.s3.AWSS3Client; +import org.jclouds.aws.s3.blobstore.AWSS3AsyncBlobStore; +import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; +import org.jclouds.blobstore.AsyncBlobStore; +import org.jclouds.blobstore.BlobRequestSigner; +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.blobstore.attr.ConsistencyModel; +import org.jclouds.blobstore.config.BlobStoreMapModule; +import org.jclouds.blobstore.internal.BlobStoreContextImpl; +import org.jclouds.location.config.RegionsLocationModule; +import org.jclouds.s3.blobstore.S3BlobRequestSigner; +import org.jclouds.s3.blobstore.config.S3BlobStoreContextModule; + +import com.google.inject.Scopes; +import com.google.inject.TypeLiteral; + +/** + * + * + * @author Tibor Kiss + */ +public class AWSS3BlobStoreContextModule extends S3BlobStoreContextModule { + + @Override + protected void configure() { + install(new BlobStoreMapModule()); + install(new RegionsLocationModule()); + bind(ConsistencyModel.class).toInstance(ConsistencyModel.EVENTUAL); + bind(AsyncBlobStore.class).to(AWSS3AsyncBlobStore.class).in(Scopes.SINGLETON); + bind(BlobStore.class).to(AWSS3BlobStore.class).in(Scopes.SINGLETON); + bind(BlobStoreContext.class).to(new TypeLiteral>() { + }).in(Scopes.SINGLETON); + bind(BlobRequestSigner.class).to(S3BlobRequestSigner.class); + bindBucketLocationStrategy(); + } +} diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java new file mode 100644 index 0000000000..fe4a91bfa6 --- /dev/null +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java @@ -0,0 +1,50 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore.strategy; + +import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy; +import org.jclouds.blobstore.domain.Blob; + +import com.google.inject.ImplementedBy; + +/** + * @see + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore.strategy.internal; + +import static org.jclouds.io.Payloads.newChunkedFilePayload; + +import java.io.File; + +import java.util.Map; + +import org.jclouds.aws.s3.AWSS3Client; +import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; +import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; +import org.jclouds.blobstore.KeyNotFoundException; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.io.Payload; +import org.jclouds.io.payloads.FilePayload; +import org.jclouds.s3.domain.ObjectMetadataBuilder; + +import com.google.common.collect.Maps; + +/** + * Provides a sequential multipart upload strategy. + * + * The file partitioning algorithm: + * + * The default partition size we choose is 32mb. + * A multiple of this default partition size is used. + * The number of parts first grows to a chosen magnitude (for example 100 parts), + * then it grows the partition size instead of number of partitions. + * When we reached the maximum part size, then again it starts to grow the number of partitions. + * + * @author Tibor Kiss + */ +public class SequentialMultipartUploadStrategy implements + MultipartUploadStrategy { + + private final long DEFAULT_PART_SIZE = 33554432; // 32mb + private final int MAGNITUDE_BASE = 100; + + private final AWSS3BlobStore ablobstore; + + // calculated only once, but not from the constructor + private volatile long parts; // required number of parts with chunkSize + private volatile long chunkSize; + private volatile long remaining; // number of bytes remained for the last part + + // sequentially updated values + private volatile int part; + private volatile long chunkOffset; + private volatile long copied; + + public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore) { + this.ablobstore = ablobstore; + } + + protected long calculateChunkSize(long length) { + long unitPartSize = DEFAULT_PART_SIZE; // first try with default part size + long parts = length / unitPartSize; + long partSize = unitPartSize; + int magnitude = (int) (parts / MAGNITUDE_BASE); + if (magnitude > 0) { + partSize = magnitude * unitPartSize; + if (partSize > MAX_PART_SIZE) { + partSize = MAX_PART_SIZE; + unitPartSize = MAX_PART_SIZE; + } + parts = length / partSize; + } + if (parts > MAX_NUMBER_OF_PARTS) { // if splits in too many parts or + // cannot be split + unitPartSize = MIN_PART_SIZE; // take the minimum part size + parts = length / unitPartSize; + } + if (parts > MAX_NUMBER_OF_PARTS) { // if still splits in too many parts + parts = MAX_NUMBER_OF_PARTS - 1; // limit them. do not care about not + // covering + } + long remainder = length % unitPartSize; + if (remainder == 0 && parts > 0) { + parts -= 1; + } + this.chunkSize = partSize; + this.parts = parts; + this.remaining = length - partSize * parts; + System.out.println(" " + length + " bytes partitioned in " + parts + + " parts of part size: " + chunkSize + ", remaining: " + + remaining + (remaining > MAX_PART_SIZE ? " overflow!" : "")); + return this.chunkSize; + } + + protected long getParts() { + return parts; + } + + protected int getNextPart() { + return ++part; + } + + protected void addCopied(long copied) { + this.copied += copied; + } + + protected long getNextChunkOffset() { + long next = chunkOffset; + chunkOffset += getChunkSize(); + return next; + } + + protected long getChunkSize() { + return chunkSize; + } + + protected long getRemaining() { + return remaining; + } + + private String prepareUploadPart(AWSS3Client client, String container, String key, String uploadId, + int part, File file, long chunkOffset, long chunkSize) { + Payload chunkedPart = newChunkedFilePayload(file, part, chunkOffset, chunkSize); + chunkedPart.getContentMetadata().setContentLength(chunkSize); + //chukedPayload.getContentMetadata().setContentMD5(???); + String eTag = null; + try { + eTag = client.uploadPart(container, key, part, uploadId, chunkedPart); + } catch (KeyNotFoundException e) { + // note that because of eventual consistency, the upload id may not be present yet + // we may wish to add this condition to the retry handler + + // we may also choose to implement ListParts and wait for the uploadId to become + // available there. + eTag = client.uploadPart(container, key, part, uploadId, chunkedPart); + } + return eTag; + } + + @Override + public String execute(String container, Blob blob) { + Payload payload = blob.getPayload(); + if (payload instanceof FilePayload) { + String key = blob.getMetadata().getName(); + File file = FilePayload.class.cast(payload).getRawContent(); + calculateChunkSize(file.length()); + long parts = getParts(); + if (parts > 0) { + AWSS3Client client = (AWSS3Client) ablobstore.getContext().getProviderSpecificContext().getApi(); + String uploadId = client.initiateMultipartUpload(container, + ObjectMetadataBuilder.create().key(key).build()); // TODO md5 + Map etags = Maps.newHashMap(); + int part; + while ((part = getNextPart()) <= getParts()) { + String eTag = prepareUploadPart(client, container, key, uploadId, part, file, getNextChunkOffset(), chunkSize); + etags.put(new Integer(part), eTag); + } + long remaining = getRemaining(); + if (remaining > 0) { + String eTag = prepareUploadPart(client, container, key, uploadId, part, file, getNextChunkOffset(), remaining); + etags.put(new Integer(part), eTag); + } + return client.completeMultipartUpload(container, key, uploadId, etags); + } else { + return ablobstore.putBlob(container, blob); + } + } else { + return ablobstore.putBlob(container, blob); + } + } +}