From f5566a4a5796b58f6efc7e29f42f33c484395b89 Mon Sep 17 00:00:00 2001 From: Tibor Kiss Date: Fri, 4 Mar 2011 23:24:32 +0100 Subject: [PATCH 1/3] AWS S3 sequential Multipart Upload strategy --- .../atmos/blobstore/AtmosBlobStore.java | 10 ++++++++ .../s3/blobstore/S3AsyncBlobStore.java | 2 +- .../org/jclouds/s3/blobstore/S3BlobStore.java | 15 +++++++++++- .../swift/blobstore/SwiftBlobStore.java | 13 ++++++++++ .../java/org/jclouds/blobstore/BlobStore.java | 16 +++++++++++++ core/pom.xml | 5 ++++ .../main/java/org/jclouds/io/Payloads.java | 5 ++++ .../jclouds/aws/s3/AWSS3ContextBuilder.java | 7 ++++++ .../jclouds/aws/s3/AWSS3ClientLiveTest.java | 24 +++++++++++++++++++ .../azureblob/blobstore/AzureBlobStore.java | 13 ++++++++++ 10 files changed, 108 insertions(+), 2 deletions(-) diff --git a/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosBlobStore.java b/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosBlobStore.java index 9ffd845297..35a8197e8e 100644 --- a/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosBlobStore.java +++ b/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosBlobStore.java @@ -207,6 +207,16 @@ public class AtmosBlobStore extends BaseBlobStore { return AtmosUtils.putBlob(sync, crypto, blob2Object, container, blob); } + /** + * This implementation invokes {@link AtmosClient#createFile} + *

+ * Since there is no etag support in atmos, we just return the path. + */ + @Override + public String putBlobMultipart(String container, Blob blob) { + return putBlob(container, blob); + } + /** * This implementation invokes {@link AtmosClient#deletePath} */ diff --git a/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3AsyncBlobStore.java b/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3AsyncBlobStore.java index b647b38d84..89fe0cc54e 100644 --- a/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3AsyncBlobStore.java +++ b/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3AsyncBlobStore.java @@ -83,7 +83,7 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore { private final Provider fetchBlobMetadataProvider; @Inject - S3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, + protected S3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier defaultLocation, @Memoized Supplier> locations, S3AsyncClient async, S3Client sync, BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, diff --git a/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3BlobStore.java b/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3BlobStore.java index a861dd9494..ed2cc10fa8 100644 --- a/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3BlobStore.java +++ b/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3BlobStore.java @@ -74,7 +74,7 @@ public class S3BlobStore extends BaseBlobStore { private final Provider fetchBlobMetadataProvider; @Inject - S3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier defaultLocation, + protected S3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier defaultLocation, @Memoized Supplier> locations, S3Client sync, BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, @@ -225,6 +225,19 @@ public class S3BlobStore extends BaseBlobStore { return sync.putObject(container, blob2Object.apply(blob)); } + /** + * This implementation invokes {@link S3Client#putObject} + * + * @param container + * bucket name + * @param blob + * object + */ + @Override + public String putBlobMultipart(String container, Blob blob) { + return sync.putObject(container, blob2Object.apply(blob)); + } + /** * This implementation invokes {@link S3Client#deleteObject} * diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java index 2f26cf5f6a..009b94a6c5 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java @@ -195,6 +195,19 @@ public class SwiftBlobStore extends BaseBlobStore { return sync.putObject(container, blob2Object.apply(blob)); } + /** + * This implementation invokes {@link CommonSwiftClient#putObject} + * + * @param container + * container name + * @param blob + * object + */ + @Override + public String putBlobMultipart(String container, Blob blob) { + return putBlob(container, blob); + } + /** * This implementation invokes {@link CommonSwiftClient#removeObject} * diff --git a/blobstore/src/main/java/org/jclouds/blobstore/BlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/BlobStore.java index 82bd9961e1..939ff5c400 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/BlobStore.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/BlobStore.java @@ -199,6 +199,22 @@ public interface BlobStore { * if the container doesn't exist */ String putBlob(String container, Blob blob); + + /** + * Adds a {@code Blob} representing the data at location {@code container/blob.metadata.name} + * using multipart strategies. + * + * @param container + * container to place the blob. + * @param blob + * fully qualified name relative to the container. + * @param options + * byte range or condition options + * @return etag of the blob you uploaded, possibly null where etags are unsupported + * @throws ContainerNotFoundException + * if the container doesn't exist + */ + String putBlobMultipart(String container, Blob blob); /** * Retrieves the metadata of a {@code Blob} at location {@code container/name} diff --git a/core/pom.xml b/core/pom.xml index 7073c21e5b..1bc48d78ea 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -119,6 +119,11 @@ jsr305 1.3.9 + + org.jboss.netty + netty + 3.2.4.Final + diff --git a/core/src/main/java/org/jclouds/io/Payloads.java b/core/src/main/java/org/jclouds/io/Payloads.java index e143183ceb..854fb6126a 100644 --- a/core/src/main/java/org/jclouds/io/Payloads.java +++ b/core/src/main/java/org/jclouds/io/Payloads.java @@ -35,6 +35,7 @@ import javax.annotation.Nullable; import org.jclouds.crypto.CryptoStreams; import org.jclouds.io.payloads.ByteArrayPayload; +import org.jclouds.io.payloads.ChunkedFilePayload; import org.jclouds.io.payloads.FilePayload; import org.jclouds.io.payloads.InputStreamPayload; import org.jclouds.io.payloads.StringPayload; @@ -83,6 +84,10 @@ public class Payloads { public static FilePayload newFilePayload(File data) { return new FilePayload(checkNotNull(data, "data")); } + + public static ChunkedFilePayload newChunkedFilePayload(File data, int part, long chunkOffset, long chunkSize) { + return new ChunkedFilePayload(checkNotNull(data, "data"), part, chunkOffset, chunkSize); + } public static UrlEncodedFormPayload newUrlEncodedFormPayload(Multimap formParams, char... skips) { return new UrlEncodedFormPayload(formParams, skips); diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3ContextBuilder.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3ContextBuilder.java index 2a717f8eaf..071663445a 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3ContextBuilder.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3ContextBuilder.java @@ -22,8 +22,10 @@ package org.jclouds.aws.s3; import java.util.List; import java.util.Properties; +import org.jclouds.aws.s3.blobstore.config.AWSS3BlobStoreContextModule; import org.jclouds.aws.s3.config.AWSS3RestClientModule; import org.jclouds.s3.S3ContextBuilder; +import org.jclouds.s3.blobstore.config.S3BlobStoreContextModule; import com.google.inject.Module; @@ -37,6 +39,11 @@ public class AWSS3ContextBuilder extends S3ContextBuilder { super(props); } + @Override + protected void addContextModule(List modules) { + modules.add(new AWSS3BlobStoreContextModule()); + } + @Override protected void addClientModule(List modules) { modules.add(new AWSS3RestClientModule()); diff --git a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/AWSS3ClientLiveTest.java b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/AWSS3ClientLiveTest.java index 5e54c2f966..6833bda5b1 100644 --- a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/AWSS3ClientLiveTest.java +++ b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/AWSS3ClientLiveTest.java @@ -27,11 +27,15 @@ import static org.jclouds.io.Payloads.newByteArrayPayload; import static org.testng.Assert.assertEquals; import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.zip.GZIPInputStream; +import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.KeyNotFoundException; +import org.jclouds.blobstore.domain.Blob; import org.jclouds.http.BaseJettyTest; import org.jclouds.http.apachehc.config.ApacheHCHttpCommandExecutorServiceModule; import org.jclouds.io.Payload; @@ -43,6 +47,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.google.common.collect.ImmutableMap; +import com.google.common.io.ByteStreams; import com.google.common.io.InputSupplier; import com.google.inject.Module; @@ -134,4 +139,23 @@ public class AWSS3ClientLiveTest extends S3ClientLiveTest { returnContainer(containerName); } } + + public void testMultipartChunkedFileStream() throws IOException, InterruptedException { + + FileOutputStream fous = new FileOutputStream(new File("target/const.txt")); + ByteStreams.copy(oneHundredOneConstitutions.getInput(), fous); + fous.flush(); + fous.close(); + String containerName = getContainerName(); + + try { + BlobStore blobStore = context.getBlobStore(); + blobStore.createContainerInLocation(null, containerName); + Blob blob = blobStore.blobBuilder("const.txt") + .payload(new File("target/const.txt")).build(); + blobStore.putBlobMultipart(containerName, blob); + } finally { + returnContainer(containerName); + } + } } diff --git a/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureBlobStore.java b/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureBlobStore.java index 3764648c31..88b0a11f19 100644 --- a/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureBlobStore.java +++ b/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureBlobStore.java @@ -192,6 +192,19 @@ public class AzureBlobStore extends BaseBlobStore { return sync.putBlob(container, blob2AzureBlob.apply(blob)); } + /** + * This implementation invokes {@link AzureBlobClient#putObject} + * + * @param container + * container name + * @param blob + * object + */ + @Override + public String putBlobMultipart(String container, Blob blob) { + return putBlob(container, blob); + } + /** * This implementation invokes {@link AzureBlobClient#deleteObject} * From 20b03aceb8ad0d16926acf46009c672890f876c8 Mon Sep 17 00:00:00 2001 From: Tibor Kiss Date: Fri, 4 Mar 2011 23:29:39 +0100 Subject: [PATCH 2/3] AWS S3 sequential multipart upload strategy, newly added files. --- .../io/payloads/ChunkedFileInputStream.java | 126 ++++++++++++ .../io/payloads/ChunkedFilePayload.java | 50 +++++ .../aws/s3/blobstore/AWSS3AsyncBlobStore.java | 67 +++++++ .../aws/s3/blobstore/AWSS3BlobStore.java | 76 +++++++ .../config/AWSS3BlobStoreContextModule.java | 59 ++++++ .../strategy/MultipartUploadStrategy.java | 50 +++++ .../SequentialMultipartUploadStrategy.java | 185 ++++++++++++++++++ 7 files changed, 613 insertions(+) create mode 100644 core/src/main/java/org/jclouds/io/payloads/ChunkedFileInputStream.java create mode 100644 core/src/main/java/org/jclouds/io/payloads/ChunkedFilePayload.java create mode 100644 providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java create mode 100644 providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java create mode 100644 providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java create mode 100644 providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java create mode 100644 providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java diff --git a/core/src/main/java/org/jclouds/io/payloads/ChunkedFileInputStream.java b/core/src/main/java/org/jclouds/io/payloads/ChunkedFileInputStream.java new file mode 100644 index 0000000000..18649dbb4f --- /dev/null +++ b/core/src/main/java/org/jclouds/io/payloads/ChunkedFileInputStream.java @@ -0,0 +1,126 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.io.payloads; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.handler.stream.ChunkedFile; + +/** + * + * + * + * + * @author Tibor Kiss + */ +public class ChunkedFileInputStream extends InputStream { + + private static final int CHUNK_SIZE = 8192; + + private ChunkedFile chunks; + private ChannelBuffer chunk; + + private IOException ex; + + public ChunkedFileInputStream(String filename, long offset, long length) { + this(new File(filename), offset, length); + } + + public ChunkedFileInputStream(File file, long offset, long length) { + try { + this.chunks = new ChunkedFile(new RandomAccessFile(file, "r"), offset, length, CHUNK_SIZE); + } catch (IOException ex) { + this.ex = ex; + } + } + + private ChannelBuffer getChunk() throws Exception { + if (ex != null) { + throw ex; + } + if (chunk == null) { + chunk = ChannelBuffer.class.cast(chunks.nextChunk()); + } + if (chunk != null) { + if (chunk.readableBytes() < 1 && chunks.hasNextChunk()) { + chunk = ChannelBuffer.class.cast(chunks.nextChunk()); + if (chunk.readableBytes() < 1) { + return null; + } + } + } else { + return null; + } + return chunk; + } + + @Override + public int read() throws IOException { + try { + ChannelBuffer chunk = getChunk(); + if (chunk == null) + return -1; + if (chunk.readableBytes() < 1) + return -1; + int readIndex = chunk.readerIndex(); + byte abyte = chunk.getByte(readIndex); + chunk.readerIndex(readIndex + 1); + return (int)abyte; + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + try { + ChannelBuffer chunk = getChunk(); + if (chunk == null) + return -1; + int readable = chunk.readableBytes(); + if (readable < 1) + return -1; + if (readable > len) { + readable = len; + } + int readIndex = chunk.readerIndex(); + chunk.getBytes(readIndex, b, off, readable); + chunk.readerIndex(readIndex + readable); + return readable; + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public void close() throws IOException { + try { + chunks.close(); + } catch (Exception e) { + throw new IOException(e); + } + } + +} diff --git a/core/src/main/java/org/jclouds/io/payloads/ChunkedFilePayload.java b/core/src/main/java/org/jclouds/io/payloads/ChunkedFilePayload.java new file mode 100644 index 0000000000..27d6983a19 --- /dev/null +++ b/core/src/main/java/org/jclouds/io/payloads/ChunkedFilePayload.java @@ -0,0 +1,50 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.io.payloads; + +import java.io.File; +import java.io.InputStream; + +public class ChunkedFilePayload extends FilePayload { + + private int part; + private long chunkOffset; + private long chunkSize; + + public ChunkedFilePayload(File content) { + this(content, 1, 0, content.length()); + } + + public ChunkedFilePayload(File content, int part, long chunkOffset, long chunkSize) { + super(content); + this.part = part; + this.chunkOffset = chunkOffset; + this.chunkSize = chunkSize; + } + + public int getPart() { + return part; + } + + @Override + public InputStream getInput() { + return new ChunkedFileInputStream(getRawContent(), chunkOffset, chunkSize); + } +} diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java new file mode 100644 index 0000000000..6b321bf4ed --- /dev/null +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java @@ -0,0 +1,67 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore; + +import java.util.Set; +import java.util.concurrent.ExecutorService; + +import javax.inject.Inject; +import javax.inject.Named; +import javax.inject.Provider; + +import org.jclouds.Constants; +import org.jclouds.aws.s3.AWSS3AsyncClient; +import org.jclouds.aws.s3.AWSS3Client; +import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.blobstore.functions.BlobToHttpGetOptions; +import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata; +import org.jclouds.blobstore.util.BlobUtils; +import org.jclouds.collect.Memoized; +import org.jclouds.domain.Location; +import org.jclouds.s3.blobstore.S3AsyncBlobStore; +import org.jclouds.s3.blobstore.functions.BlobToObject; +import org.jclouds.s3.blobstore.functions.BucketToResourceList; +import org.jclouds.s3.blobstore.functions.BucketToResourceMetadata; +import org.jclouds.s3.blobstore.functions.ContainerToBucketListOptions; +import org.jclouds.s3.blobstore.functions.ObjectToBlob; +import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata; + +import com.google.common.base.Supplier; + +/** + * + * @author Tibor Kiss + */ +public class AWSS3AsyncBlobStore extends S3AsyncBlobStore { + + @Inject + public AWSS3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, + @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier defaultLocation, + @Memoized Supplier> locations, AWSS3AsyncClient async, AWSS3Client sync, + BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, + BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, + BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, + Provider fetchBlobMetadataProvider) { + super(context, blobUtils, service, defaultLocation, locations, async, sync, bucket2ResourceMd, + container2BucketListOptions, bucket2ResourceList, object2Blob, blob2ObjectGetOptions, + blob2Object, object2BlobMd, fetchBlobMetadataProvider); + } + +} diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java new file mode 100644 index 0000000000..a2d7582e46 --- /dev/null +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java @@ -0,0 +1,76 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore; + +import java.util.Set; + +import javax.inject.Inject; +import javax.inject.Provider; + +import org.jclouds.aws.s3.AWSS3Client; +import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; +import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy; +import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.functions.BlobToHttpGetOptions; +import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata; +import org.jclouds.blobstore.util.BlobUtils; +import org.jclouds.collect.Memoized; +import org.jclouds.domain.Location; +import org.jclouds.s3.blobstore.S3BlobStore; +import org.jclouds.s3.blobstore.functions.BlobToObject; +import org.jclouds.s3.blobstore.functions.BucketToResourceList; +import org.jclouds.s3.blobstore.functions.BucketToResourceMetadata; +import org.jclouds.s3.blobstore.functions.ContainerToBucketListOptions; +import org.jclouds.s3.blobstore.functions.ObjectToBlob; +import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata; + +import com.google.common.base.Supplier; + +/** + * Proived AWS S3 specific extensions. + * + * @author Tibor Kiss + */ +public class AWSS3BlobStore extends S3BlobStore { + + private MultipartUploadStrategy multipartUploadStrategy; + + @Inject + AWSS3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier defaultLocation, + @Memoized Supplier> locations, AWSS3Client sync, + BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, + BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, + BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, + Provider fetchBlobMetadataProvider) { + super(context, blobUtils, defaultLocation, locations, sync, bucket2ResourceMd, container2BucketListOptions, + bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object, object2BlobMd, + fetchBlobMetadataProvider); + multipartUploadStrategy = new SequentialMultipartUploadStrategy(this); + } + + /* (non-Javadoc) + * @see org.jclouds.blobstore.internal.BaseBlobStore#putBlobMultipart(java.lang.String, org.jclouds.blobstore.domain.Blob) + */ + @Override + public String putBlobMultipart(String container, Blob blob) { + return multipartUploadStrategy.execute(container, blob); + } +} diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java new file mode 100644 index 0000000000..1a11c54459 --- /dev/null +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java @@ -0,0 +1,59 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore.config; + +import org.jclouds.aws.s3.AWSS3AsyncClient; +import org.jclouds.aws.s3.AWSS3Client; +import org.jclouds.aws.s3.blobstore.AWSS3AsyncBlobStore; +import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; +import org.jclouds.blobstore.AsyncBlobStore; +import org.jclouds.blobstore.BlobRequestSigner; +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.blobstore.attr.ConsistencyModel; +import org.jclouds.blobstore.config.BlobStoreMapModule; +import org.jclouds.blobstore.internal.BlobStoreContextImpl; +import org.jclouds.location.config.RegionsLocationModule; +import org.jclouds.s3.blobstore.S3BlobRequestSigner; +import org.jclouds.s3.blobstore.config.S3BlobStoreContextModule; + +import com.google.inject.Scopes; +import com.google.inject.TypeLiteral; + +/** + * + * + * @author Tibor Kiss + */ +public class AWSS3BlobStoreContextModule extends S3BlobStoreContextModule { + + @Override + protected void configure() { + install(new BlobStoreMapModule()); + install(new RegionsLocationModule()); + bind(ConsistencyModel.class).toInstance(ConsistencyModel.EVENTUAL); + bind(AsyncBlobStore.class).to(AWSS3AsyncBlobStore.class).in(Scopes.SINGLETON); + bind(BlobStore.class).to(AWSS3BlobStore.class).in(Scopes.SINGLETON); + bind(BlobStoreContext.class).to(new TypeLiteral>() { + }).in(Scopes.SINGLETON); + bind(BlobRequestSigner.class).to(S3BlobRequestSigner.class); + bindBucketLocationStrategy(); + } +} diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java new file mode 100644 index 0000000000..fe4a91bfa6 --- /dev/null +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java @@ -0,0 +1,50 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore.strategy; + +import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy; +import org.jclouds.blobstore.domain.Blob; + +import com.google.inject.ImplementedBy; + +/** + * @see + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore.strategy.internal; + +import static org.jclouds.io.Payloads.newChunkedFilePayload; + +import java.io.File; + +import java.util.Map; + +import org.jclouds.aws.s3.AWSS3Client; +import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; +import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; +import org.jclouds.blobstore.KeyNotFoundException; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.io.Payload; +import org.jclouds.io.payloads.FilePayload; +import org.jclouds.s3.domain.ObjectMetadataBuilder; + +import com.google.common.collect.Maps; + +/** + * Provides a sequential multipart upload strategy. + * + * The file partitioning algorithm: + * + * The default partition size we choose is 32mb. + * A multiple of this default partition size is used. + * The number of parts first grows to a chosen magnitude (for example 100 parts), + * then it grows the partition size instead of number of partitions. + * When we reached the maximum part size, then again it starts to grow the number of partitions. + * + * @author Tibor Kiss + */ +public class SequentialMultipartUploadStrategy implements + MultipartUploadStrategy { + + private final long DEFAULT_PART_SIZE = 33554432; // 32mb + private final int MAGNITUDE_BASE = 100; + + private final AWSS3BlobStore ablobstore; + + // calculated only once, but not from the constructor + private volatile long parts; // required number of parts with chunkSize + private volatile long chunkSize; + private volatile long remaining; // number of bytes remained for the last part + + // sequentially updated values + private volatile int part; + private volatile long chunkOffset; + private volatile long copied; + + public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore) { + this.ablobstore = ablobstore; + } + + protected long calculateChunkSize(long length) { + long unitPartSize = DEFAULT_PART_SIZE; // first try with default part size + long parts = length / unitPartSize; + long partSize = unitPartSize; + int magnitude = (int) (parts / MAGNITUDE_BASE); + if (magnitude > 0) { + partSize = magnitude * unitPartSize; + if (partSize > MAX_PART_SIZE) { + partSize = MAX_PART_SIZE; + unitPartSize = MAX_PART_SIZE; + } + parts = length / partSize; + } + if (parts > MAX_NUMBER_OF_PARTS) { // if splits in too many parts or + // cannot be split + unitPartSize = MIN_PART_SIZE; // take the minimum part size + parts = length / unitPartSize; + } + if (parts > MAX_NUMBER_OF_PARTS) { // if still splits in too many parts + parts = MAX_NUMBER_OF_PARTS - 1; // limit them. do not care about not + // covering + } + long remainder = length % unitPartSize; + if (remainder == 0 && parts > 0) { + parts -= 1; + } + this.chunkSize = partSize; + this.parts = parts; + this.remaining = length - partSize * parts; + System.out.println(" " + length + " bytes partitioned in " + parts + + " parts of part size: " + chunkSize + ", remaining: " + + remaining + (remaining > MAX_PART_SIZE ? " overflow!" : "")); + return this.chunkSize; + } + + protected long getParts() { + return parts; + } + + protected int getNextPart() { + return ++part; + } + + protected void addCopied(long copied) { + this.copied += copied; + } + + protected long getNextChunkOffset() { + long next = chunkOffset; + chunkOffset += getChunkSize(); + return next; + } + + protected long getChunkSize() { + return chunkSize; + } + + protected long getRemaining() { + return remaining; + } + + private String prepareUploadPart(AWSS3Client client, String container, String key, String uploadId, + int part, File file, long chunkOffset, long chunkSize) { + Payload chunkedPart = newChunkedFilePayload(file, part, chunkOffset, chunkSize); + chunkedPart.getContentMetadata().setContentLength(chunkSize); + //chukedPayload.getContentMetadata().setContentMD5(???); + String eTag = null; + try { + eTag = client.uploadPart(container, key, part, uploadId, chunkedPart); + } catch (KeyNotFoundException e) { + // note that because of eventual consistency, the upload id may not be present yet + // we may wish to add this condition to the retry handler + + // we may also choose to implement ListParts and wait for the uploadId to become + // available there. + eTag = client.uploadPart(container, key, part, uploadId, chunkedPart); + } + return eTag; + } + + @Override + public String execute(String container, Blob blob) { + Payload payload = blob.getPayload(); + if (payload instanceof FilePayload) { + String key = blob.getMetadata().getName(); + File file = FilePayload.class.cast(payload).getRawContent(); + calculateChunkSize(file.length()); + long parts = getParts(); + if (parts > 0) { + AWSS3Client client = (AWSS3Client) ablobstore.getContext().getProviderSpecificContext().getApi(); + String uploadId = client.initiateMultipartUpload(container, + ObjectMetadataBuilder.create().key(key).build()); // TODO md5 + Map etags = Maps.newHashMap(); + int part; + while ((part = getNextPart()) <= getParts()) { + String eTag = prepareUploadPart(client, container, key, uploadId, part, file, getNextChunkOffset(), chunkSize); + etags.put(new Integer(part), eTag); + } + long remaining = getRemaining(); + if (remaining > 0) { + String eTag = prepareUploadPart(client, container, key, uploadId, part, file, getNextChunkOffset(), remaining); + etags.put(new Integer(part), eTag); + } + return client.completeMultipartUpload(container, key, uploadId, etags); + } else { + return ablobstore.putBlob(container, blob); + } + } else { + return ablobstore.putBlob(container, blob); + } + } +} From b6667353f9fa2f04680f48bb2b65a293c5edfce6 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Sun, 6 Mar 2011 00:04:46 -0500 Subject: [PATCH 3/3] Issue 486: moved netty to a driver and created a base payload slicer --- .../atmos/blobstore/AtmosAsyncBlobStore.java | 5 + .../filesystem/FilesystemAsyncBlobStore.java | 6 + .../s3/blobstore/S3AsyncBlobStore.java | 31 +++-- .../config/S3BlobStoreContextModule.java | 8 +- .../swift/blobstore/SwiftAsyncBlobStore.java | 48 ++++---- .../org/jclouds/blobstore/AsyncBlobStore.java | 7 ++ .../java/org/jclouds/blobstore/BlobStore.java | 10 +- .../blobstore/TransientAsyncBlobStore.java | 6 + core/pom.xml | 5 - .../java/org/jclouds/io/PayloadSlicer.java | 47 +++++++ .../main/java/org/jclouds/io/Payloads.java | 5 - .../io/internal/BasePayloadSlicer.java | 104 ++++++++++++++++ .../payloads/InputStreamSupplierPayload.java | 75 +++++++++++ drivers/netty/.pom.xml.swp | Bin 0 -> 12288 bytes drivers/netty/pom.xml | 82 +++++++++++++ .../netty/config/NettyPayloadModule.java | 20 +++ .../netty/io}/ChunkedFileInputStream.java | 31 +++-- .../jclouds/netty/io/NettyPayloadSlicer.java | 40 +++--- drivers/pom.xml | 1 + providers/aws-s3/pom.xml | 27 ++++ .../jclouds/aws/s3/AWSS3ContextBuilder.java | 1 - .../aws/s3/blobstore/AWSS3AsyncBlobStore.java | 25 +++- .../aws/s3/blobstore/AWSS3BlobStore.java | 27 ++-- .../config/AWSS3BlobStoreContextModule.java | 29 +++-- .../SequentialMultipartUploadStrategy.java | 116 +++++++++--------- .../integration/AWSS3TestInitializer.java | 3 +- .../blobstore/AzureAsyncBlobStore.java | 41 ++++--- 27 files changed, 597 insertions(+), 203 deletions(-) create mode 100644 core/src/main/java/org/jclouds/io/PayloadSlicer.java create mode 100644 core/src/main/java/org/jclouds/io/internal/BasePayloadSlicer.java create mode 100644 core/src/main/java/org/jclouds/io/payloads/InputStreamSupplierPayload.java create mode 100644 drivers/netty/.pom.xml.swp create mode 100644 drivers/netty/pom.xml create mode 100644 drivers/netty/src/main/java/org/jclouds/netty/config/NettyPayloadModule.java rename {core/src/main/java/org/jclouds/io/payloads => drivers/netty/src/main/java/org/jclouds/netty/io}/ChunkedFileInputStream.java (94%) rename core/src/main/java/org/jclouds/io/payloads/ChunkedFilePayload.java => drivers/netty/src/main/java/org/jclouds/netty/io/NettyPayloadSlicer.java (54%) diff --git a/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosAsyncBlobStore.java b/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosAsyncBlobStore.java index a96ed0b66c..25025050e0 100644 --- a/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosAsyncBlobStore.java +++ b/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosAsyncBlobStore.java @@ -250,4 +250,9 @@ public class AtmosAsyncBlobStore extends BaseAsyncBlobStore { return async.deletePath(container + "/" + key); } + @Override + public ListenableFuture putBlobMultipart(String container, Blob blob) { + return putBlob(container, blob); + } + } diff --git a/apis/filesystem/src/main/java/org/jclouds/filesystem/FilesystemAsyncBlobStore.java b/apis/filesystem/src/main/java/org/jclouds/filesystem/FilesystemAsyncBlobStore.java index 694e884d65..631f68723f 100644 --- a/apis/filesystem/src/main/java/org/jclouds/filesystem/FilesystemAsyncBlobStore.java +++ b/apis/filesystem/src/main/java/org/jclouds/filesystem/FilesystemAsyncBlobStore.java @@ -660,4 +660,10 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore { String eTag = CryptoStreams.hex(object.getPayload().getContentMetadata().getContentMD5()); return eTag; } + + @Override + public ListenableFuture putBlobMultipart(String container, Blob blob) { + return putBlob(container, blob); + } + } diff --git a/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3AsyncBlobStore.java b/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3AsyncBlobStore.java index 89fe0cc54e..95e757ef9e 100644 --- a/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3AsyncBlobStore.java +++ b/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3AsyncBlobStore.java @@ -84,12 +84,12 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore { @Inject protected S3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, - @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier defaultLocation, - @Memoized Supplier> locations, S3AsyncClient async, S3Client sync, - BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, - BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, - BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, - Provider fetchBlobMetadataProvider) { + @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier defaultLocation, + @Memoized Supplier> locations, S3AsyncClient async, S3Client sync, + BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, + BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, + BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, + Provider fetchBlobMetadataProvider) { super(context, blobUtils, service, defaultLocation, locations); this.blob2ObjectGetOptions = checkNotNull(blob2ObjectGetOptions, "blob2ObjectGetOptions"); this.async = checkNotNull(async, "async"); @@ -109,11 +109,11 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore { @Override public ListenableFuture> list() { return Futures.compose(async.listOwnedBuckets(), - new Function, org.jclouds.blobstore.domain.PageSet>() { - public org.jclouds.blobstore.domain.PageSet apply(Set from) { - return new PageSetImpl(Iterables.transform(from, bucket2ResourceMd), null); - } - }, service); + new Function, org.jclouds.blobstore.domain.PageSet>() { + public org.jclouds.blobstore.domain.PageSet apply(Set from) { + return new PageSetImpl(Iterables.transform(from, bucket2ResourceMd), null); + } + }, service); } /** @@ -153,9 +153,9 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore { ListBucketOptions httpOptions = container2BucketListOptions.apply(options); ListenableFuture returnVal = async.listBucket(container, httpOptions); ListenableFuture> list = Futures.compose(returnVal, bucket2ResourceList, - service); + service); return (options.isDetailed()) ? Futures.compose(list, - fetchBlobMetadataProvider.get().setContainerName(container), service) : list; + fetchBlobMetadataProvider.get().setContainerName(container), service) : list; } /** @@ -238,4 +238,9 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore { return async.deleteObject(container, key); } + @Override + public ListenableFuture putBlobMultipart(String container, Blob blob) { + return putBlob(container, blob); + } + } diff --git a/apis/s3/src/main/java/org/jclouds/s3/blobstore/config/S3BlobStoreContextModule.java b/apis/s3/src/main/java/org/jclouds/s3/blobstore/config/S3BlobStoreContextModule.java index 815d2e5ff6..3a65f9fd0c 100644 --- a/apis/s3/src/main/java/org/jclouds/s3/blobstore/config/S3BlobStoreContextModule.java +++ b/apis/s3/src/main/java/org/jclouds/s3/blobstore/config/S3BlobStoreContextModule.java @@ -55,12 +55,16 @@ public class S3BlobStoreContextModule extends AbstractModule { bind(ConsistencyModel.class).toInstance(ConsistencyModel.EVENTUAL); bind(AsyncBlobStore.class).to(S3AsyncBlobStore.class).in(Scopes.SINGLETON); bind(BlobStore.class).to(S3BlobStore.class).in(Scopes.SINGLETON); - bind(BlobStoreContext.class).to(new TypeLiteral>() { - }).in(Scopes.SINGLETON); + bindContext(); bind(BlobRequestSigner.class).to(S3BlobRequestSigner.class); bindBucketLocationStrategy(); } + protected void bindContext() { + bind(BlobStoreContext.class).to(new TypeLiteral>() { + }).in(Scopes.SINGLETON); + } + protected void bindBucketLocationStrategy() { bind(new TypeLiteral>() { }).to(LocationFromBucketLocation.class); diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java index 84eeed2afe..7e59547c8e 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java @@ -83,13 +83,13 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { @Inject SwiftAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, - @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier defaultLocation, - @Memoized Supplier> locations, CommonSwiftClient sync, CommonSwiftAsyncClient async, - ContainerToResourceMetadata container2ResourceMd, - BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions, - ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object, - ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions, - Provider fetchBlobMetadataProvider) { + @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier defaultLocation, + @Memoized Supplier> locations, CommonSwiftClient sync, CommonSwiftAsyncClient async, + ContainerToResourceMetadata container2ResourceMd, + BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions, + ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object, + ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions, + Provider fetchBlobMetadataProvider) { super(context, blobUtils, service, defaultLocation, locations); this.sync = sync; this.async = async; @@ -109,12 +109,11 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { @Override public ListenableFuture> list() { return Futures.compose(async.listContainers(), - new Function, org.jclouds.blobstore.domain.PageSet>() { - public org.jclouds.blobstore.domain.PageSet apply( - Set from) { - return new PageSetImpl(Iterables.transform(from, container2ResourceMd), null); - } - }, service); + new Function, org.jclouds.blobstore.domain.PageSet>() { + public org.jclouds.blobstore.domain.PageSet apply(Set from) { + return new PageSetImpl(Iterables.transform(from, container2ResourceMd), null); + } + }, service); } /** @@ -145,12 +144,12 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { @Override public ListenableFuture> list(String container, ListContainerOptions options) { org.jclouds.openstack.swift.options.ListContainerOptions httpOptions = container2ContainerListOptions - .apply(options); + .apply(options); ListenableFuture> returnVal = async.listObjects(container, httpOptions); ListenableFuture> list = Futures.compose(returnVal, container2ResourceList, - service); + service); return options.isDetailed() ? Futures.compose(list, fetchBlobMetadataProvider.get().setContainerName(container), - service) : list; + service) : list; } /** @@ -177,14 +176,14 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { @Override public ListenableFuture blobMetadata(String container, String key) { return Futures.compose(async.getObjectInfo(container, key), - new Function() { + new Function() { - @Override - public BlobMetadata apply(MutableObjectInfoWithMetadata from) { - return object2BlobMd.apply(from); - } + @Override + public BlobMetadata apply(MutableObjectInfoWithMetadata from) { + return object2BlobMd.apply(from); + } - }, service); + }, service); } /** @@ -235,4 +234,9 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { return !sync.containerExists(container); } + @Override + public ListenableFuture putBlobMultipart(String container, Blob blob) { + return putBlob(container, blob); + } + } diff --git a/blobstore/src/main/java/org/jclouds/blobstore/AsyncBlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/AsyncBlobStore.java index c71c81ab72..7cb6990e89 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/AsyncBlobStore.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/AsyncBlobStore.java @@ -32,6 +32,7 @@ import org.jclouds.blobstore.options.GetOptions; import org.jclouds.blobstore.options.ListContainerOptions; import org.jclouds.domain.Location; +import com.google.common.annotations.Beta; import com.google.common.util.concurrent.ListenableFuture; /** @@ -126,6 +127,12 @@ public interface AsyncBlobStore { */ ListenableFuture putBlob(String container, Blob blob); + /** + * @see BlobStore#putBlobMultipart + */ + @Beta + ListenableFuture putBlobMultipart(String container, Blob blob); + /** * @see BlobStore#blobMetadata */ diff --git a/blobstore/src/main/java/org/jclouds/blobstore/BlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/BlobStore.java index 939ff5c400..a347567ee2 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/BlobStore.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/BlobStore.java @@ -32,6 +32,8 @@ import org.jclouds.blobstore.options.GetOptions; import org.jclouds.blobstore.options.ListContainerOptions; import org.jclouds.domain.Location; +import com.google.common.annotations.Beta; + /** * Synchronous access to a BlobStore such as Amazon S3 * @@ -48,17 +50,18 @@ public interface BlobStore { /** * creates a new blob with the specified name. + * * @see #blobBuilder */ @Deprecated Blob newBlob(String name); - + /** * * @return builder for creating new {@link Blob}s */ BlobBuilder blobBuilder(String name); - + /** * The get locations command returns all the valid locations for containers. A location has a * scope, which is typically region or zone. A region is a general area, like eu-west, where a @@ -199,7 +202,7 @@ public interface BlobStore { * if the container doesn't exist */ String putBlob(String container, Blob blob); - + /** * Adds a {@code Blob} representing the data at location {@code container/blob.metadata.name} * using multipart strategies. @@ -214,6 +217,7 @@ public interface BlobStore { * @throws ContainerNotFoundException * if the container doesn't exist */ + @Beta String putBlobMultipart(String container, Blob blob); /** diff --git a/blobstore/src/main/java/org/jclouds/blobstore/TransientAsyncBlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/TransientAsyncBlobStore.java index 706339ff1f..7afac3bbae 100755 --- a/blobstore/src/main/java/org/jclouds/blobstore/TransientAsyncBlobStore.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/TransientAsyncBlobStore.java @@ -675,4 +675,10 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore { return containerToLocation; } + @Override + public ListenableFuture putBlobMultipart(String container, Blob blob) { + // TODO implement + return putBlob(container, blob); + } + } diff --git a/core/pom.xml b/core/pom.xml index 1bc48d78ea..7073c21e5b 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -119,11 +119,6 @@ jsr305 1.3.9 - - org.jboss.netty - netty - 3.2.4.Final - diff --git a/core/src/main/java/org/jclouds/io/PayloadSlicer.java b/core/src/main/java/org/jclouds/io/PayloadSlicer.java new file mode 100644 index 0000000000..fc0092af76 --- /dev/null +++ b/core/src/main/java/org/jclouds/io/PayloadSlicer.java @@ -0,0 +1,47 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.io; + +import org.jclouds.io.internal.BasePayloadSlicer; + +import com.google.inject.ImplementedBy; + + +/** + * + * @author Adrian Cole + */ +@ImplementedBy(BasePayloadSlicer.class) +public interface PayloadSlicer { + /** + * Returns a {@link Payload} that returns input streams from the an underlying payload, where + * each stream starts at the given offset and is limited to the specified number of bytes. + * + * @param input + * the payload from which to get the raw streams + * @param offset + * the offset in bytes into the underlying stream where the returned streams will start + * @param length + * the maximum length of the returned streams + * @throws IllegalArgumentException + * if offset or length are negative + */ + Payload slice(Payload input, long offset, long length); +} \ No newline at end of file diff --git a/core/src/main/java/org/jclouds/io/Payloads.java b/core/src/main/java/org/jclouds/io/Payloads.java index 854fb6126a..e143183ceb 100644 --- a/core/src/main/java/org/jclouds/io/Payloads.java +++ b/core/src/main/java/org/jclouds/io/Payloads.java @@ -35,7 +35,6 @@ import javax.annotation.Nullable; import org.jclouds.crypto.CryptoStreams; import org.jclouds.io.payloads.ByteArrayPayload; -import org.jclouds.io.payloads.ChunkedFilePayload; import org.jclouds.io.payloads.FilePayload; import org.jclouds.io.payloads.InputStreamPayload; import org.jclouds.io.payloads.StringPayload; @@ -84,10 +83,6 @@ public class Payloads { public static FilePayload newFilePayload(File data) { return new FilePayload(checkNotNull(data, "data")); } - - public static ChunkedFilePayload newChunkedFilePayload(File data, int part, long chunkOffset, long chunkSize) { - return new ChunkedFilePayload(checkNotNull(data, "data"), part, chunkOffset, chunkSize); - } public static UrlEncodedFormPayload newUrlEncodedFormPayload(Multimap formParams, char... skips) { return new UrlEncodedFormPayload(formParams, skips); diff --git a/core/src/main/java/org/jclouds/io/internal/BasePayloadSlicer.java b/core/src/main/java/org/jclouds/io/internal/BasePayloadSlicer.java new file mode 100644 index 0000000000..545b1e3f51 --- /dev/null +++ b/core/src/main/java/org/jclouds/io/internal/BasePayloadSlicer.java @@ -0,0 +1,104 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.io.internal; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.InputStream; + +import javax.inject.Singleton; + +import org.jclouds.io.InputSuppliers; +import org.jclouds.io.Payload; +import org.jclouds.io.PayloadSlicer; +import org.jclouds.io.payloads.BaseMutableContentMetadata; +import org.jclouds.io.payloads.InputStreamSupplierPayload; + +import com.google.common.base.Throwables; +import com.google.common.io.ByteStreams; + +/** + * + * @author Adrian Cole + */ +@Singleton +public class BasePayloadSlicer implements PayloadSlicer { + /** + * {@inheritDoc} + */ + @Override + public Payload slice(Payload input, long offset, long length) { + checkNotNull(input); + checkArgument(offset >= 0, "offset is negative"); + checkArgument(length >= 0, "length is negative"); + Payload returnVal; + if (input.getRawContent() instanceof File) { + returnVal = doSlice((File) input.getRawContent(), offset, length); + } else if (input.getRawContent() instanceof String) { + returnVal = doSlice((byte[]) input.getRawContent(), offset, length); + } else if (input.getRawContent() instanceof byte[]) { + returnVal = doSlice((byte[]) input.getRawContent(), offset, length); + } else { + returnVal = doSlice(input.getInput(), offset, length); + } + return copyMetadataAndSetLength(input, returnVal, length); + } + + protected Payload doSlice(Payload content, long offset, long length) { + return new InputStreamSupplierPayload(ByteStreams.slice(content, offset, length)); + } + + protected Payload doSlice(String content, long offset, long length) { + return doSlice(content.getBytes(), offset, length); + } + + protected Payload doSlice(File content, long offset, long length) { + try { + return doSlice(new FileInputStream(content), offset, length); + } catch (FileNotFoundException e) { + Throwables.propagate(e); + return null; + } + } + + protected Payload doSlice(InputStream content, long offset, long length) { + return new InputStreamSupplierPayload(ByteStreams.slice(InputSuppliers.of(content), offset, length)); + } + + protected Payload doSlice(byte[] content, long offset, long length) { + Payload returnVal; + checkArgument(offset <= Integer.MAX_VALUE, "offset is too big for an array"); + checkArgument(length <= Integer.MAX_VALUE, "length is too big for an array"); + returnVal = new InputStreamSupplierPayload( + ByteStreams.newInputStreamSupplier(content, (int) offset, (int) length)); + return returnVal; + } + + protected Payload copyMetadataAndSetLength(Payload input, Payload returnVal, long length) { + returnVal.setContentMetadata(BaseMutableContentMetadata.fromContentMetadata(input.getContentMetadata() + .toBuilder().contentLength(length).contentMD5(null).build())); + return returnVal; + } + +} \ No newline at end of file diff --git a/core/src/main/java/org/jclouds/io/payloads/InputStreamSupplierPayload.java b/core/src/main/java/org/jclouds/io/payloads/InputStreamSupplierPayload.java new file mode 100644 index 0000000000..2dd1043a29 --- /dev/null +++ b/core/src/main/java/org/jclouds/io/payloads/InputStreamSupplierPayload.java @@ -0,0 +1,75 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.io.payloads; + +import static com.google.common.io.Closeables.closeQuietly; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.io.InputSupplier; + +/** + * @author Adrian Cole + */ +public class InputStreamSupplierPayload extends BasePayload> { + private List toClose = Lists.newArrayList(); + + public InputStreamSupplierPayload(InputSupplier content) { + super(content); + } + + /** + * {@inheritDoc} + */ + @Override + public InputStream getInput() { + try { + InputStream returnVal = content.getInput(); + toClose.add(returnVal); + return returnVal; + } catch (IOException e) { + Throwables.propagate(e); + return null; + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isRepeatable() { + return true; + } + + /** + * if we created the stream, then it is already consumed on close. + */ + @Override + public void release() { + if (toClose.size() > 0) + for (InputStream content = toClose.remove(0); toClose.size() > 0; content = toClose.remove(0)) + closeQuietly(content); + } + +} \ No newline at end of file diff --git a/drivers/netty/.pom.xml.swp b/drivers/netty/.pom.xml.swp new file mode 100644 index 0000000000000000000000000000000000000000..d57d9b18a0787ecb82428c4444caaa0babadd5d7 GIT binary patch literal 12288 zcmeI2O>7%Q6vrnOUxgM>3lcY`2BB8U?mB5Cs&wPxx~*!+7je=L0jkEkV|&7SXW1FY zvG^26S|M>m2rdZG8)ptZa6oVX4t!jC0PPg zOX=s>rqA>?|Hu`{704CH704CH704CH704CH704CH704CH6}Sx*U_K##;g|pV9dI6h z{~!GRfAs-EegNNt@4!Xy7AS*1?L^o+N#P%S}!Y2^p3S+mrT`eM*EZaGgOT8JH!XE{nBjFi%(gk~wnZI+>P@ zrp;J(BG}2eX|L9$Qg%9G&@nW3hf!s`&%~N(HBR(n&|K-Qm+k*3_|!N(`TiI zgD-a-^bAeitz^uI{;$Nno(4P7_PUEUh;I~IvNjvUO=^geAxk#2od`RSbSMIHY?MCR zsoLL*Rx*<{497BybOUimY7djyJz!b1E|IM|W>F-<<}?jC_ojjeev@QKkfPBna_Q!^-&2KQsnDnhPMS4)Z zwkhB4phB6l-IvPlpEBK4z3^;eF%o*TV2~$Z8V6DrnQoPA+rhsy^1;zmvj;~u#ydzs zhtHr1yJ*$s%Zs(s3(H&7;_L%x5J~zKTbJUk;@KQXHVzk(^WaSSM4YmVcF}bBh* zm6%pui?`V<4(cW=I8#V!07^ict?|%i5kt4hQY^Y-CZgL}m?75M)P>@OqP?#CLW#J8 zl)-u4P~F7j1!^p(;n~o-+wI!j!^VlpVsXMbH#b`|PFo^W8t0RFfskQ7&fJ2_h!cpm z&_)zRJ9t}(%^sK)9uF0_jY*C3Y$I*RzLAQ!CmO<|0SlWQ*5tG)@yH9YuV}>Mwou8N zilQv^%{*tT%cmDsmg(8@(o%VTxq7lj7nW#cVg5vQxwA?+$0I%*ub1vxj< z55IzFW6^ABda!bcP8KJMw2~Z>rxhfOCuB#B)7jaIO + + + + 4.0.0 + + org.jclouds + jclouds-project + 1.0-SNAPSHOT + ../../project/pom.xml + + org.jclouds.driver + jclouds-netty + jclouds netty payload module + jclouds netty payload module + + + + + jclouds-sona-snapshots-nexus + https://oss.sonatype.org/content/repositories/snapshots + + false + + + true + + + + jboss-public-releases + https://repository.jboss.org/nexus/content/groups/public-jboss + + true + + + false + + + + + + + org.jclouds + jclouds-core + ${project.version} + + + org.jclouds + jclouds-core + ${project.version} + test-jar + test + + + org.jboss.netty + netty + 3.2.4.Final + provided + + + + diff --git a/drivers/netty/src/main/java/org/jclouds/netty/config/NettyPayloadModule.java b/drivers/netty/src/main/java/org/jclouds/netty/config/NettyPayloadModule.java new file mode 100644 index 0000000000..c0aedb5563 --- /dev/null +++ b/drivers/netty/src/main/java/org/jclouds/netty/config/NettyPayloadModule.java @@ -0,0 +1,20 @@ +package org.jclouds.netty.config; + +import org.jclouds.io.PayloadSlicer; +import org.jclouds.netty.io.NettyPayloadSlicer; + +import com.google.inject.AbstractModule; + +/** + * + * @author Adrian Cole + * + */ +public class NettyPayloadModule extends AbstractModule { + + @Override + protected void configure() { + bind(PayloadSlicer.class).to(NettyPayloadSlicer.class); + } + +} diff --git a/core/src/main/java/org/jclouds/io/payloads/ChunkedFileInputStream.java b/drivers/netty/src/main/java/org/jclouds/netty/io/ChunkedFileInputStream.java similarity index 94% rename from core/src/main/java/org/jclouds/io/payloads/ChunkedFileInputStream.java rename to drivers/netty/src/main/java/org/jclouds/netty/io/ChunkedFileInputStream.java index 18649dbb4f..73436f1195 100644 --- a/core/src/main/java/org/jclouds/io/payloads/ChunkedFileInputStream.java +++ b/drivers/netty/src/main/java/org/jclouds/netty/io/ChunkedFileInputStream.java @@ -17,10 +17,9 @@ * ==================================================================== */ -package org.jclouds.io.payloads; +package org.jclouds.netty.io; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; @@ -30,24 +29,24 @@ import org.jboss.netty.handler.stream.ChunkedFile; /** * - * - * - * + * + * + * * @author Tibor Kiss */ public class ChunkedFileInputStream extends InputStream { - + private static final int CHUNK_SIZE = 8192; - + private ChunkedFile chunks; private ChannelBuffer chunk; - + private IOException ex; public ChunkedFileInputStream(String filename, long offset, long length) { this(new File(filename), offset, length); } - + public ChunkedFileInputStream(File file, long offset, long length) { try { this.chunks = new ChunkedFile(new RandomAccessFile(file, "r"), offset, length, CHUNK_SIZE); @@ -62,9 +61,9 @@ public class ChunkedFileInputStream extends InputStream { } if (chunk == null) { chunk = ChannelBuffer.class.cast(chunks.nextChunk()); - } + } if (chunk != null) { - if (chunk.readableBytes() < 1 && chunks.hasNextChunk()) { + if (chunk.readableBytes() < 1 && chunks.hasNextChunk()) { chunk = ChannelBuffer.class.cast(chunks.nextChunk()); if (chunk.readableBytes() < 1) { return null; @@ -75,7 +74,7 @@ public class ChunkedFileInputStream extends InputStream { } return chunk; } - + @Override public int read() throws IOException { try { @@ -87,15 +86,15 @@ public class ChunkedFileInputStream extends InputStream { int readIndex = chunk.readerIndex(); byte abyte = chunk.getByte(readIndex); chunk.readerIndex(readIndex + 1); - return (int)abyte; + return (int) abyte; } catch (Exception e) { throw new IOException(e); } } - + @Override public int read(byte[] b, int off, int len) throws IOException { - try { + try { ChannelBuffer chunk = getChunk(); if (chunk == null) return -1; @@ -111,7 +110,7 @@ public class ChunkedFileInputStream extends InputStream { return readable; } catch (Exception e) { throw new IOException(e); - } + } } @Override diff --git a/core/src/main/java/org/jclouds/io/payloads/ChunkedFilePayload.java b/drivers/netty/src/main/java/org/jclouds/netty/io/NettyPayloadSlicer.java similarity index 54% rename from core/src/main/java/org/jclouds/io/payloads/ChunkedFilePayload.java rename to drivers/netty/src/main/java/org/jclouds/netty/io/NettyPayloadSlicer.java index 27d6983a19..b130936d40 100644 --- a/core/src/main/java/org/jclouds/io/payloads/ChunkedFilePayload.java +++ b/drivers/netty/src/main/java/org/jclouds/netty/io/NettyPayloadSlicer.java @@ -17,34 +17,26 @@ * ==================================================================== */ -package org.jclouds.io.payloads; +package org.jclouds.netty.io; import java.io.File; -import java.io.InputStream; -public class ChunkedFilePayload extends FilePayload { +import javax.inject.Singleton; - private int part; - private long chunkOffset; - private long chunkSize; - - public ChunkedFilePayload(File content) { - this(content, 1, 0, content.length()); - } - - public ChunkedFilePayload(File content, int part, long chunkOffset, long chunkSize) { - super(content); - this.part = part; - this.chunkOffset = chunkOffset; - this.chunkSize = chunkSize; - } - - public int getPart() { - return part; - } +import org.jclouds.io.Payload; +import org.jclouds.io.Payloads; +import org.jclouds.io.internal.BasePayloadSlicer; + +/** + * + * @author Adrian Cole + */ +@Singleton +public class NettyPayloadSlicer extends BasePayloadSlicer { @Override - public InputStream getInput() { - return new ChunkedFileInputStream(getRawContent(), chunkOffset, chunkSize); + protected Payload doSlice(File content, long offset, long length) { + return Payloads.newInputStreamPayload(new ChunkedFileInputStream(content, offset, length)); } -} + +} \ No newline at end of file diff --git a/drivers/pom.xml b/drivers/pom.xml index e151791a72..a5d240a0da 100644 --- a/drivers/pom.xml +++ b/drivers/pom.xml @@ -38,6 +38,7 @@ bouncycastle log4j jsch + netty enterprise diff --git a/providers/aws-s3/pom.xml b/providers/aws-s3/pom.xml index 9fea31babb..48048825a6 100644 --- a/providers/aws-s3/pom.xml +++ b/providers/aws-s3/pom.xml @@ -42,6 +42,21 @@ ${test.aws.credential} + + + + jboss-public-releases + https://repository.jboss.org/nexus/content/groups/public-jboss + + true + + + false + + + + + org.jclouds.api @@ -94,6 +109,18 @@ 1.2.16 test + + org.jclouds.driver + jclouds-netty + ${project.version} + test + + + org.jboss.netty + netty + 3.2.4.Final + test + diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3ContextBuilder.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3ContextBuilder.java index 071663445a..bdb5d8a8bb 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3ContextBuilder.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3ContextBuilder.java @@ -25,7 +25,6 @@ import java.util.Properties; import org.jclouds.aws.s3.blobstore.config.AWSS3BlobStoreContextModule; import org.jclouds.aws.s3.config.AWSS3RestClientModule; import org.jclouds.s3.S3ContextBuilder; -import org.jclouds.s3.blobstore.config.S3BlobStoreContextModule; import com.google.inject.Module; diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java index 6b321bf4ed..7c83451d57 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java @@ -29,7 +29,9 @@ import javax.inject.Provider; import org.jclouds.Constants; import org.jclouds.aws.s3.AWSS3AsyncClient; import org.jclouds.aws.s3.AWSS3Client; +import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.functions.BlobToHttpGetOptions; import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata; import org.jclouds.blobstore.util.BlobUtils; @@ -44,13 +46,17 @@ import org.jclouds.s3.blobstore.functions.ObjectToBlob; import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata; import com.google.common.base.Supplier; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; /** - * + * * @author Tibor Kiss */ public class AWSS3AsyncBlobStore extends S3AsyncBlobStore { + private final Provider multipartUploadStrategy; + @Inject public AWSS3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier defaultLocation, @@ -58,10 +64,19 @@ public class AWSS3AsyncBlobStore extends S3AsyncBlobStore { BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, - Provider fetchBlobMetadataProvider) { - super(context, blobUtils, service, defaultLocation, locations, async, sync, bucket2ResourceMd, - container2BucketListOptions, bucket2ResourceList, object2Blob, blob2ObjectGetOptions, - blob2Object, object2BlobMd, fetchBlobMetadataProvider); + Provider fetchBlobMetadataProvider, + Provider multipartUploadStrategy) { + super(context, blobUtils, service, defaultLocation, locations, async, sync, bucket2ResourceMd, + container2BucketListOptions, bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object, + object2BlobMd, fetchBlobMetadataProvider); + this.multipartUploadStrategy = multipartUploadStrategy; + } + + @Override + public ListenableFuture putBlobMultipart(String container, Blob blob) { + // TODO: make this better + // need to use a provider if the strategy object is stateful + return Futures.immediateFuture(multipartUploadStrategy.get().execute(container, blob)); } } diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java index a2d7582e46..7e3983b4a3 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java @@ -26,7 +26,6 @@ import javax.inject.Provider; import org.jclouds.aws.s3.AWSS3Client; import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; -import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy; import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.functions.BlobToHttpGetOptions; @@ -45,32 +44,30 @@ import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata; import com.google.common.base.Supplier; /** - * Proived AWS S3 specific extensions. - * + * Proived AWS S3 specific extensions. + * * @author Tibor Kiss */ public class AWSS3BlobStore extends S3BlobStore { - - private MultipartUploadStrategy multipartUploadStrategy; + + private final Provider multipartUploadStrategy; @Inject AWSS3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier defaultLocation, - @Memoized Supplier> locations, AWSS3Client sync, - BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, - BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, - BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, - Provider fetchBlobMetadataProvider) { + @Memoized Supplier> locations, AWSS3Client sync, + BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, + BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, + BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, + Provider fetchBlobMetadataProvider, Provider multipartUploadStrategy) { super(context, blobUtils, defaultLocation, locations, sync, bucket2ResourceMd, container2BucketListOptions, bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object, object2BlobMd, fetchBlobMetadataProvider); - multipartUploadStrategy = new SequentialMultipartUploadStrategy(this); + this.multipartUploadStrategy = multipartUploadStrategy; } - /* (non-Javadoc) - * @see org.jclouds.blobstore.internal.BaseBlobStore#putBlobMultipart(java.lang.String, org.jclouds.blobstore.domain.Blob) - */ @Override public String putBlobMultipart(String container, Blob blob) { - return multipartUploadStrategy.execute(container, blob); + // need to use a provider if the strategy object is stateful + return multipartUploadStrategy.get().execute(container, blob); } } diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java index 1a11c54459..75cea06094 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java @@ -23,15 +23,12 @@ import org.jclouds.aws.s3.AWSS3AsyncClient; import org.jclouds.aws.s3.AWSS3Client; import org.jclouds.aws.s3.blobstore.AWSS3AsyncBlobStore; import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; -import org.jclouds.blobstore.AsyncBlobStore; -import org.jclouds.blobstore.BlobRequestSigner; -import org.jclouds.blobstore.BlobStore; +import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; +import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy; import org.jclouds.blobstore.BlobStoreContext; -import org.jclouds.blobstore.attr.ConsistencyModel; -import org.jclouds.blobstore.config.BlobStoreMapModule; import org.jclouds.blobstore.internal.BlobStoreContextImpl; -import org.jclouds.location.config.RegionsLocationModule; -import org.jclouds.s3.blobstore.S3BlobRequestSigner; +import org.jclouds.s3.blobstore.S3AsyncBlobStore; +import org.jclouds.s3.blobstore.S3BlobStore; import org.jclouds.s3.blobstore.config.S3BlobStoreContextModule; import com.google.inject.Scopes; @@ -39,21 +36,23 @@ import com.google.inject.TypeLiteral; /** * - * + * * @author Tibor Kiss */ public class AWSS3BlobStoreContextModule extends S3BlobStoreContextModule { @Override protected void configure() { - install(new BlobStoreMapModule()); - install(new RegionsLocationModule()); - bind(ConsistencyModel.class).toInstance(ConsistencyModel.EVENTUAL); - bind(AsyncBlobStore.class).to(AWSS3AsyncBlobStore.class).in(Scopes.SINGLETON); - bind(BlobStore.class).to(AWSS3BlobStore.class).in(Scopes.SINGLETON); + super.configure(); + bind(S3AsyncBlobStore.class).to(AWSS3AsyncBlobStore.class).in(Scopes.SINGLETON); + bind(S3BlobStore.class).to(AWSS3BlobStore.class).in(Scopes.SINGLETON); + bind(MultipartUploadStrategy.class).to(SequentialMultipartUploadStrategy.class); + } + + @Override + protected void bindContext() { bind(BlobStoreContext.class).to(new TypeLiteral>() { }).in(Scopes.SINGLETON); - bind(BlobRequestSigner.class).to(S3BlobRequestSigner.class); - bindBucketLocationStrategy(); } + } diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java index bab6765f1c..25d338ebe6 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java @@ -19,19 +19,23 @@ package org.jclouds.aws.s3.blobstore.strategy.internal; -import static org.jclouds.io.Payloads.newChunkedFilePayload; - -import java.io.File; +import static com.google.common.base.Preconditions.checkNotNull; import java.util.Map; +import javax.annotation.Resource; +import javax.inject.Inject; +import javax.inject.Named; + import org.jclouds.aws.s3.AWSS3Client; import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; import org.jclouds.blobstore.KeyNotFoundException; import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.reference.BlobStoreConstants; import org.jclouds.io.Payload; -import org.jclouds.io.payloads.FilePayload; +import org.jclouds.io.PayloadSlicer; +import org.jclouds.logging.Logger; import org.jclouds.s3.domain.ObjectMetadataBuilder; import com.google.common.collect.Maps; @@ -40,35 +44,39 @@ import com.google.common.collect.Maps; * Provides a sequential multipart upload strategy. * * The file partitioning algorithm: - * - * The default partition size we choose is 32mb. - * A multiple of this default partition size is used. - * The number of parts first grows to a chosen magnitude (for example 100 parts), - * then it grows the partition size instead of number of partitions. - * When we reached the maximum part size, then again it starts to grow the number of partitions. - * + * + * The default partition size we choose is 32mb. A multiple of this default partition size is used. + * The number of parts first grows to a chosen magnitude (for example 100 parts), then it grows the + * partition size instead of number of partitions. When we reached the maximum part size, then again + * it starts to grow the number of partitions. + * * @author Tibor Kiss */ -public class SequentialMultipartUploadStrategy implements - MultipartUploadStrategy { +public class SequentialMultipartUploadStrategy implements MultipartUploadStrategy { + @Resource + @Named(BlobStoreConstants.BLOBSTORE_LOGGER) + protected Logger logger = Logger.NULL; - private final long DEFAULT_PART_SIZE = 33554432; // 32mb + private final long DEFAULT_PART_SIZE = 33554432; // 32mb private final int MAGNITUDE_BASE = 100; - + private final AWSS3BlobStore ablobstore; + private final PayloadSlicer slicer; // calculated only once, but not from the constructor private volatile long parts; // required number of parts with chunkSize private volatile long chunkSize; private volatile long remaining; // number of bytes remained for the last part - + // sequentially updated values private volatile int part; private volatile long chunkOffset; private volatile long copied; - - public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore) { - this.ablobstore = ablobstore; + + @Inject + public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore, PayloadSlicer slicer) { + this.ablobstore = checkNotNull(ablobstore, "ablobstore"); + this.slicer = checkNotNull(slicer, "slicer"); } protected long calculateChunkSize(long length) { @@ -100,43 +108,39 @@ public class SequentialMultipartUploadStrategy implements this.chunkSize = partSize; this.parts = parts; this.remaining = length - partSize * parts; - System.out.println(" " + length + " bytes partitioned in " + parts - + " parts of part size: " + chunkSize + ", remaining: " - + remaining + (remaining > MAX_PART_SIZE ? " overflow!" : "")); + logger.debug(" %d bytes partitioned in %d parts of part size: %d, remaining: %d%s", length, parts, chunkSize, + remaining, (remaining > MAX_PART_SIZE ? " overflow!" : "")); return this.chunkSize; } - + protected long getParts() { return parts; } - + protected int getNextPart() { return ++part; } - + protected void addCopied(long copied) { this.copied += copied; } - + protected long getNextChunkOffset() { long next = chunkOffset; chunkOffset += getChunkSize(); return next; } - + protected long getChunkSize() { return chunkSize; } - + protected long getRemaining() { return remaining; } - - private String prepareUploadPart(AWSS3Client client, String container, String key, String uploadId, - int part, File file, long chunkOffset, long chunkSize) { - Payload chunkedPart = newChunkedFilePayload(file, part, chunkOffset, chunkSize); - chunkedPart.getContentMetadata().setContentLength(chunkSize); - //chukedPayload.getContentMetadata().setContentMD5(???); + + private String prepareUploadPart(AWSS3Client client, String container, String key, String uploadId, int part, + Payload chunkedPart) { String eTag = null; try { eTag = client.uploadPart(container, key, part, uploadId, chunkedPart); @@ -153,31 +157,27 @@ public class SequentialMultipartUploadStrategy implements @Override public String execute(String container, Blob blob) { - Payload payload = blob.getPayload(); - if (payload instanceof FilePayload) { - String key = blob.getMetadata().getName(); - File file = FilePayload.class.cast(payload).getRawContent(); - calculateChunkSize(file.length()); - long parts = getParts(); - if (parts > 0) { - AWSS3Client client = (AWSS3Client) ablobstore.getContext().getProviderSpecificContext().getApi(); - String uploadId = client.initiateMultipartUpload(container, - ObjectMetadataBuilder.create().key(key).build()); // TODO md5 - Map etags = Maps.newHashMap(); - int part; - while ((part = getNextPart()) <= getParts()) { - String eTag = prepareUploadPart(client, container, key, uploadId, part, file, getNextChunkOffset(), chunkSize); - etags.put(new Integer(part), eTag); - } - long remaining = getRemaining(); - if (remaining > 0) { - String eTag = prepareUploadPart(client, container, key, uploadId, part, file, getNextChunkOffset(), remaining); - etags.put(new Integer(part), eTag); - } - return client.completeMultipartUpload(container, key, uploadId, etags); - } else { - return ablobstore.putBlob(container, blob); + String key = blob.getMetadata().getName(); + calculateChunkSize(blob.getPayload().getContentMetadata().getContentLength()); + long parts = getParts(); + if (parts > 0) { + AWSS3Client client = (AWSS3Client) ablobstore.getContext().getProviderSpecificContext().getApi(); + String uploadId = client.initiateMultipartUpload(container, ObjectMetadataBuilder.create().key(key).build()); // TODO + // md5 + Map etags = Maps.newHashMap(); + int part; + while ((part = getNextPart()) <= getParts()) { + String eTag = prepareUploadPart(client, container, key, uploadId, part, + slicer.slice(blob.getPayload(), getNextChunkOffset(), chunkSize)); + etags.put(new Integer(part), eTag); } + long remaining = getRemaining(); + if (remaining > 0) { + String eTag = prepareUploadPart(client, container, key, uploadId, part, + slicer.slice(blob.getPayload(), getNextChunkOffset(), remaining)); + etags.put(new Integer(part), eTag); + } + return client.completeMultipartUpload(container, key, uploadId, etags); } else { return ablobstore.putBlob(container, blob); } diff --git a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/integration/AWSS3TestInitializer.java b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/integration/AWSS3TestInitializer.java index 8478b31ba3..9f37f754c3 100644 --- a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/integration/AWSS3TestInitializer.java +++ b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/integration/AWSS3TestInitializer.java @@ -26,6 +26,7 @@ import org.jclouds.blobstore.BlobStoreContextFactory; import org.jclouds.blobstore.integration.TransientBlobStoreTestInitializer; import org.jclouds.blobstore.integration.internal.BaseBlobStoreIntegrationTest; import org.jclouds.logging.log4j.config.Log4JLoggingModule; +import org.jclouds.netty.config.NettyPayloadModule; import com.google.common.collect.ImmutableSet; import com.google.inject.Module; @@ -45,7 +46,7 @@ public class AWSS3TestInitializer extends TransientBlobStoreTestInitializer { protected BlobStoreContext createLiveContext(Module configurationModule, String endpoint, String apiversion, String app, String identity, String credential) throws IOException { return new BlobStoreContextFactory().createContext(provider, ImmutableSet.of(configurationModule, - new Log4JLoggingModule()), setupProperties(endpoint, apiversion, identity, credential)); + new Log4JLoggingModule(), new NettyPayloadModule()), setupProperties(endpoint, apiversion, identity, credential)); } } diff --git a/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureAsyncBlobStore.java b/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureAsyncBlobStore.java index 13f17e9368..780aa9274f 100644 --- a/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureAsyncBlobStore.java +++ b/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureAsyncBlobStore.java @@ -30,6 +30,7 @@ import javax.inject.Named; import javax.inject.Singleton; import org.jclouds.Constants; +import org.jclouds.azure.storage.domain.BoundedSet; import org.jclouds.azureblob.AzureBlobAsyncClient; import org.jclouds.azureblob.blobstore.functions.AzureBlobToBlob; import org.jclouds.azureblob.blobstore.functions.BlobPropertiesToBlobMetadata; @@ -42,7 +43,6 @@ import org.jclouds.azureblob.domain.BlobProperties; import org.jclouds.azureblob.domain.ContainerProperties; import org.jclouds.azureblob.domain.ListBlobsResponse; import org.jclouds.azureblob.options.ListBlobsOptions; -import org.jclouds.azure.storage.domain.BoundedSet; import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.domain.BlobMetadata; @@ -79,18 +79,18 @@ public class AzureAsyncBlobStore extends BaseAsyncBlobStore { @Inject AzureAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, - @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier defaultLocation, - @Memoized Supplier> locations, AzureBlobAsyncClient async, - ContainerToResourceMetadata container2ResourceMd, - ListOptionsToListBlobsOptions blobStore2AzureContainerListOptions, - ListBlobsResponseToResourceList azure2BlobStoreResourceList, AzureBlobToBlob azureBlob2Blob, - BlobToAzureBlob blob2AzureBlob, BlobPropertiesToBlobMetadata blob2BlobMd, - BlobToHttpGetOptions blob2ObjectGetOptions) { + @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier defaultLocation, + @Memoized Supplier> locations, AzureBlobAsyncClient async, + ContainerToResourceMetadata container2ResourceMd, + ListOptionsToListBlobsOptions blobStore2AzureContainerListOptions, + ListBlobsResponseToResourceList azure2BlobStoreResourceList, AzureBlobToBlob azureBlob2Blob, + BlobToAzureBlob blob2AzureBlob, BlobPropertiesToBlobMetadata blob2BlobMd, + BlobToHttpGetOptions blob2ObjectGetOptions) { super(context, blobUtils, service, defaultLocation, locations); this.async = checkNotNull(async, "async"); this.container2ResourceMd = checkNotNull(container2ResourceMd, "container2ResourceMd"); this.blobStore2AzureContainerListOptions = checkNotNull(blobStore2AzureContainerListOptions, - "blobStore2AzureContainerListOptions"); + "blobStore2AzureContainerListOptions"); this.azure2BlobStoreResourceList = checkNotNull(azure2BlobStoreResourceList, "azure2BlobStoreResourceList"); this.azureBlob2Blob = checkNotNull(azureBlob2Blob, "azureBlob2Blob"); this.blob2AzureBlob = checkNotNull(blob2AzureBlob, "blob2AzureBlob"); @@ -105,15 +105,15 @@ public class AzureAsyncBlobStore extends BaseAsyncBlobStore { @Override public ListenableFuture> list() { return Futures - .compose( - async.listContainers(includeMetadata()), - new Function, org.jclouds.blobstore.domain.PageSet>() { - public org.jclouds.blobstore.domain.PageSet apply( - BoundedSet from) { - return new PageSetImpl(Iterables.transform(from, container2ResourceMd), - from.getNextMarker()); - } - }, service); + .compose( + async.listContainers(includeMetadata()), + new Function, org.jclouds.blobstore.domain.PageSet>() { + public org.jclouds.blobstore.domain.PageSet apply( + BoundedSet from) { + return new PageSetImpl(Iterables.transform(from, container2ResourceMd), from + .getNextMarker()); + } + }, service); } /** @@ -243,4 +243,9 @@ public class AzureAsyncBlobStore extends BaseAsyncBlobStore { throw new UnsupportedOperationException("please use deleteContainer"); } + @Override + public ListenableFuture putBlobMultipart(String container, Blob blob) { + return putBlob(container, blob); + } + }