diff --git a/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosAsyncBlobStore.java b/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosAsyncBlobStore.java index a96ed0b66c..25025050e0 100644 --- a/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosAsyncBlobStore.java +++ b/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosAsyncBlobStore.java @@ -250,4 +250,9 @@ public class AtmosAsyncBlobStore extends BaseAsyncBlobStore { return async.deletePath(container + "/" + key); } + @Override + public ListenableFuture putBlobMultipart(String container, Blob blob) { + return putBlob(container, blob); + } + } diff --git a/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosBlobStore.java b/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosBlobStore.java index 9ffd845297..35a8197e8e 100644 --- a/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosBlobStore.java +++ b/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosBlobStore.java @@ -207,6 +207,16 @@ public class AtmosBlobStore extends BaseBlobStore { return AtmosUtils.putBlob(sync, crypto, blob2Object, container, blob); } + /** + * This implementation invokes {@link AtmosClient#createFile} + *

+ * Since there is no etag support in atmos, we just return the path. + */ + @Override + public String putBlobMultipart(String container, Blob blob) { + return putBlob(container, blob); + } + /** * This implementation invokes {@link AtmosClient#deletePath} */ diff --git a/apis/filesystem/src/main/java/org/jclouds/filesystem/FilesystemAsyncBlobStore.java b/apis/filesystem/src/main/java/org/jclouds/filesystem/FilesystemAsyncBlobStore.java index 694e884d65..631f68723f 100644 --- a/apis/filesystem/src/main/java/org/jclouds/filesystem/FilesystemAsyncBlobStore.java +++ b/apis/filesystem/src/main/java/org/jclouds/filesystem/FilesystemAsyncBlobStore.java @@ -660,4 +660,10 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore { String eTag = CryptoStreams.hex(object.getPayload().getContentMetadata().getContentMD5()); return eTag; } + + @Override + public ListenableFuture putBlobMultipart(String container, Blob blob) { + return putBlob(container, blob); + } + } diff --git a/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3AsyncBlobStore.java b/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3AsyncBlobStore.java index b647b38d84..95e757ef9e 100644 --- a/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3AsyncBlobStore.java +++ b/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3AsyncBlobStore.java @@ -83,13 +83,13 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore { private final Provider fetchBlobMetadataProvider; @Inject - S3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, - @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier defaultLocation, - @Memoized Supplier> locations, S3AsyncClient async, S3Client sync, - BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, - BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, - BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, - Provider fetchBlobMetadataProvider) { + protected S3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, + @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier defaultLocation, + @Memoized Supplier> locations, S3AsyncClient async, S3Client sync, + BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, + BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, + BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, + Provider fetchBlobMetadataProvider) { super(context, blobUtils, service, defaultLocation, locations); this.blob2ObjectGetOptions = checkNotNull(blob2ObjectGetOptions, "blob2ObjectGetOptions"); this.async = checkNotNull(async, "async"); @@ -109,11 +109,11 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore { @Override public ListenableFuture> list() { return Futures.compose(async.listOwnedBuckets(), - new Function, org.jclouds.blobstore.domain.PageSet>() { - public org.jclouds.blobstore.domain.PageSet apply(Set from) { - return new PageSetImpl(Iterables.transform(from, bucket2ResourceMd), null); - } - }, service); + new Function, org.jclouds.blobstore.domain.PageSet>() { + public org.jclouds.blobstore.domain.PageSet apply(Set from) { + return new PageSetImpl(Iterables.transform(from, bucket2ResourceMd), null); + } + }, service); } /** @@ -153,9 +153,9 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore { ListBucketOptions httpOptions = container2BucketListOptions.apply(options); ListenableFuture returnVal = async.listBucket(container, httpOptions); ListenableFuture> list = Futures.compose(returnVal, bucket2ResourceList, - service); + service); return (options.isDetailed()) ? Futures.compose(list, - fetchBlobMetadataProvider.get().setContainerName(container), service) : list; + fetchBlobMetadataProvider.get().setContainerName(container), service) : list; } /** @@ -238,4 +238,9 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore { return async.deleteObject(container, key); } + @Override + public ListenableFuture putBlobMultipart(String container, Blob blob) { + return putBlob(container, blob); + } + } diff --git a/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3BlobStore.java b/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3BlobStore.java index a861dd9494..ed2cc10fa8 100644 --- a/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3BlobStore.java +++ b/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3BlobStore.java @@ -74,7 +74,7 @@ public class S3BlobStore extends BaseBlobStore { private final Provider fetchBlobMetadataProvider; @Inject - S3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier defaultLocation, + protected S3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier defaultLocation, @Memoized Supplier> locations, S3Client sync, BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, @@ -225,6 +225,19 @@ public class S3BlobStore extends BaseBlobStore { return sync.putObject(container, blob2Object.apply(blob)); } + /** + * This implementation invokes {@link S3Client#putObject} + * + * @param container + * bucket name + * @param blob + * object + */ + @Override + public String putBlobMultipart(String container, Blob blob) { + return sync.putObject(container, blob2Object.apply(blob)); + } + /** * This implementation invokes {@link S3Client#deleteObject} * diff --git a/apis/s3/src/main/java/org/jclouds/s3/blobstore/config/S3BlobStoreContextModule.java b/apis/s3/src/main/java/org/jclouds/s3/blobstore/config/S3BlobStoreContextModule.java index 815d2e5ff6..3a65f9fd0c 100644 --- a/apis/s3/src/main/java/org/jclouds/s3/blobstore/config/S3BlobStoreContextModule.java +++ b/apis/s3/src/main/java/org/jclouds/s3/blobstore/config/S3BlobStoreContextModule.java @@ -55,12 +55,16 @@ public class S3BlobStoreContextModule extends AbstractModule { bind(ConsistencyModel.class).toInstance(ConsistencyModel.EVENTUAL); bind(AsyncBlobStore.class).to(S3AsyncBlobStore.class).in(Scopes.SINGLETON); bind(BlobStore.class).to(S3BlobStore.class).in(Scopes.SINGLETON); - bind(BlobStoreContext.class).to(new TypeLiteral>() { - }).in(Scopes.SINGLETON); + bindContext(); bind(BlobRequestSigner.class).to(S3BlobRequestSigner.class); bindBucketLocationStrategy(); } + protected void bindContext() { + bind(BlobStoreContext.class).to(new TypeLiteral>() { + }).in(Scopes.SINGLETON); + } + protected void bindBucketLocationStrategy() { bind(new TypeLiteral>() { }).to(LocationFromBucketLocation.class); diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java index 84eeed2afe..7e59547c8e 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java @@ -83,13 +83,13 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { @Inject SwiftAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, - @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier defaultLocation, - @Memoized Supplier> locations, CommonSwiftClient sync, CommonSwiftAsyncClient async, - ContainerToResourceMetadata container2ResourceMd, - BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions, - ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object, - ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions, - Provider fetchBlobMetadataProvider) { + @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier defaultLocation, + @Memoized Supplier> locations, CommonSwiftClient sync, CommonSwiftAsyncClient async, + ContainerToResourceMetadata container2ResourceMd, + BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions, + ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object, + ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions, + Provider fetchBlobMetadataProvider) { super(context, blobUtils, service, defaultLocation, locations); this.sync = sync; this.async = async; @@ -109,12 +109,11 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { @Override public ListenableFuture> list() { return Futures.compose(async.listContainers(), - new Function, org.jclouds.blobstore.domain.PageSet>() { - public org.jclouds.blobstore.domain.PageSet apply( - Set from) { - return new PageSetImpl(Iterables.transform(from, container2ResourceMd), null); - } - }, service); + new Function, org.jclouds.blobstore.domain.PageSet>() { + public org.jclouds.blobstore.domain.PageSet apply(Set from) { + return new PageSetImpl(Iterables.transform(from, container2ResourceMd), null); + } + }, service); } /** @@ -145,12 +144,12 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { @Override public ListenableFuture> list(String container, ListContainerOptions options) { org.jclouds.openstack.swift.options.ListContainerOptions httpOptions = container2ContainerListOptions - .apply(options); + .apply(options); ListenableFuture> returnVal = async.listObjects(container, httpOptions); ListenableFuture> list = Futures.compose(returnVal, container2ResourceList, - service); + service); return options.isDetailed() ? Futures.compose(list, fetchBlobMetadataProvider.get().setContainerName(container), - service) : list; + service) : list; } /** @@ -177,14 +176,14 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { @Override public ListenableFuture blobMetadata(String container, String key) { return Futures.compose(async.getObjectInfo(container, key), - new Function() { + new Function() { - @Override - public BlobMetadata apply(MutableObjectInfoWithMetadata from) { - return object2BlobMd.apply(from); - } + @Override + public BlobMetadata apply(MutableObjectInfoWithMetadata from) { + return object2BlobMd.apply(from); + } - }, service); + }, service); } /** @@ -235,4 +234,9 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { return !sync.containerExists(container); } + @Override + public ListenableFuture putBlobMultipart(String container, Blob blob) { + return putBlob(container, blob); + } + } diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java index 2f26cf5f6a..009b94a6c5 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java @@ -195,6 +195,19 @@ public class SwiftBlobStore extends BaseBlobStore { return sync.putObject(container, blob2Object.apply(blob)); } + /** + * This implementation invokes {@link CommonSwiftClient#putObject} + * + * @param container + * container name + * @param blob + * object + */ + @Override + public String putBlobMultipart(String container, Blob blob) { + return putBlob(container, blob); + } + /** * This implementation invokes {@link CommonSwiftClient#removeObject} * diff --git a/blobstore/src/main/java/org/jclouds/blobstore/AsyncBlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/AsyncBlobStore.java index c71c81ab72..7cb6990e89 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/AsyncBlobStore.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/AsyncBlobStore.java @@ -32,6 +32,7 @@ import org.jclouds.blobstore.options.GetOptions; import org.jclouds.blobstore.options.ListContainerOptions; import org.jclouds.domain.Location; +import com.google.common.annotations.Beta; import com.google.common.util.concurrent.ListenableFuture; /** @@ -126,6 +127,12 @@ public interface AsyncBlobStore { */ ListenableFuture putBlob(String container, Blob blob); + /** + * @see BlobStore#putBlobMultipart + */ + @Beta + ListenableFuture putBlobMultipart(String container, Blob blob); + /** * @see BlobStore#blobMetadata */ diff --git a/blobstore/src/main/java/org/jclouds/blobstore/BlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/BlobStore.java index 82bd9961e1..a347567ee2 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/BlobStore.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/BlobStore.java @@ -32,6 +32,8 @@ import org.jclouds.blobstore.options.GetOptions; import org.jclouds.blobstore.options.ListContainerOptions; import org.jclouds.domain.Location; +import com.google.common.annotations.Beta; + /** * Synchronous access to a BlobStore such as Amazon S3 * @@ -48,17 +50,18 @@ public interface BlobStore { /** * creates a new blob with the specified name. + * * @see #blobBuilder */ @Deprecated Blob newBlob(String name); - + /** * * @return builder for creating new {@link Blob}s */ BlobBuilder blobBuilder(String name); - + /** * The get locations command returns all the valid locations for containers. A location has a * scope, which is typically region or zone. A region is a general area, like eu-west, where a @@ -200,6 +203,23 @@ public interface BlobStore { */ String putBlob(String container, Blob blob); + /** + * Adds a {@code Blob} representing the data at location {@code container/blob.metadata.name} + * using multipart strategies. + * + * @param container + * container to place the blob. + * @param blob + * fully qualified name relative to the container. + * @param options + * byte range or condition options + * @return etag of the blob you uploaded, possibly null where etags are unsupported + * @throws ContainerNotFoundException + * if the container doesn't exist + */ + @Beta + String putBlobMultipart(String container, Blob blob); + /** * Retrieves the metadata of a {@code Blob} at location {@code container/name} * diff --git a/blobstore/src/main/java/org/jclouds/blobstore/TransientAsyncBlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/TransientAsyncBlobStore.java index 706339ff1f..7afac3bbae 100755 --- a/blobstore/src/main/java/org/jclouds/blobstore/TransientAsyncBlobStore.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/TransientAsyncBlobStore.java @@ -675,4 +675,10 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore { return containerToLocation; } + @Override + public ListenableFuture putBlobMultipart(String container, Blob blob) { + // TODO implement + return putBlob(container, blob); + } + } diff --git a/core/src/main/java/org/jclouds/io/PayloadSlicer.java b/core/src/main/java/org/jclouds/io/PayloadSlicer.java new file mode 100644 index 0000000000..fc0092af76 --- /dev/null +++ b/core/src/main/java/org/jclouds/io/PayloadSlicer.java @@ -0,0 +1,47 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.io; + +import org.jclouds.io.internal.BasePayloadSlicer; + +import com.google.inject.ImplementedBy; + + +/** + * + * @author Adrian Cole + */ +@ImplementedBy(BasePayloadSlicer.class) +public interface PayloadSlicer { + /** + * Returns a {@link Payload} that returns input streams from the an underlying payload, where + * each stream starts at the given offset and is limited to the specified number of bytes. + * + * @param input + * the payload from which to get the raw streams + * @param offset + * the offset in bytes into the underlying stream where the returned streams will start + * @param length + * the maximum length of the returned streams + * @throws IllegalArgumentException + * if offset or length are negative + */ + Payload slice(Payload input, long offset, long length); +} \ No newline at end of file diff --git a/core/src/main/java/org/jclouds/io/internal/BasePayloadSlicer.java b/core/src/main/java/org/jclouds/io/internal/BasePayloadSlicer.java new file mode 100644 index 0000000000..545b1e3f51 --- /dev/null +++ b/core/src/main/java/org/jclouds/io/internal/BasePayloadSlicer.java @@ -0,0 +1,104 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.io.internal; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.InputStream; + +import javax.inject.Singleton; + +import org.jclouds.io.InputSuppliers; +import org.jclouds.io.Payload; +import org.jclouds.io.PayloadSlicer; +import org.jclouds.io.payloads.BaseMutableContentMetadata; +import org.jclouds.io.payloads.InputStreamSupplierPayload; + +import com.google.common.base.Throwables; +import com.google.common.io.ByteStreams; + +/** + * + * @author Adrian Cole + */ +@Singleton +public class BasePayloadSlicer implements PayloadSlicer { + /** + * {@inheritDoc} + */ + @Override + public Payload slice(Payload input, long offset, long length) { + checkNotNull(input); + checkArgument(offset >= 0, "offset is negative"); + checkArgument(length >= 0, "length is negative"); + Payload returnVal; + if (input.getRawContent() instanceof File) { + returnVal = doSlice((File) input.getRawContent(), offset, length); + } else if (input.getRawContent() instanceof String) { + returnVal = doSlice((byte[]) input.getRawContent(), offset, length); + } else if (input.getRawContent() instanceof byte[]) { + returnVal = doSlice((byte[]) input.getRawContent(), offset, length); + } else { + returnVal = doSlice(input.getInput(), offset, length); + } + return copyMetadataAndSetLength(input, returnVal, length); + } + + protected Payload doSlice(Payload content, long offset, long length) { + return new InputStreamSupplierPayload(ByteStreams.slice(content, offset, length)); + } + + protected Payload doSlice(String content, long offset, long length) { + return doSlice(content.getBytes(), offset, length); + } + + protected Payload doSlice(File content, long offset, long length) { + try { + return doSlice(new FileInputStream(content), offset, length); + } catch (FileNotFoundException e) { + Throwables.propagate(e); + return null; + } + } + + protected Payload doSlice(InputStream content, long offset, long length) { + return new InputStreamSupplierPayload(ByteStreams.slice(InputSuppliers.of(content), offset, length)); + } + + protected Payload doSlice(byte[] content, long offset, long length) { + Payload returnVal; + checkArgument(offset <= Integer.MAX_VALUE, "offset is too big for an array"); + checkArgument(length <= Integer.MAX_VALUE, "length is too big for an array"); + returnVal = new InputStreamSupplierPayload( + ByteStreams.newInputStreamSupplier(content, (int) offset, (int) length)); + return returnVal; + } + + protected Payload copyMetadataAndSetLength(Payload input, Payload returnVal, long length) { + returnVal.setContentMetadata(BaseMutableContentMetadata.fromContentMetadata(input.getContentMetadata() + .toBuilder().contentLength(length).contentMD5(null).build())); + return returnVal; + } + +} \ No newline at end of file diff --git a/core/src/main/java/org/jclouds/io/payloads/InputStreamSupplierPayload.java b/core/src/main/java/org/jclouds/io/payloads/InputStreamSupplierPayload.java new file mode 100644 index 0000000000..2dd1043a29 --- /dev/null +++ b/core/src/main/java/org/jclouds/io/payloads/InputStreamSupplierPayload.java @@ -0,0 +1,75 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.io.payloads; + +import static com.google.common.io.Closeables.closeQuietly; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.io.InputSupplier; + +/** + * @author Adrian Cole + */ +public class InputStreamSupplierPayload extends BasePayload> { + private List toClose = Lists.newArrayList(); + + public InputStreamSupplierPayload(InputSupplier content) { + super(content); + } + + /** + * {@inheritDoc} + */ + @Override + public InputStream getInput() { + try { + InputStream returnVal = content.getInput(); + toClose.add(returnVal); + return returnVal; + } catch (IOException e) { + Throwables.propagate(e); + return null; + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isRepeatable() { + return true; + } + + /** + * if we created the stream, then it is already consumed on close. + */ + @Override + public void release() { + if (toClose.size() > 0) + for (InputStream content = toClose.remove(0); toClose.size() > 0; content = toClose.remove(0)) + closeQuietly(content); + } + +} \ No newline at end of file diff --git a/drivers/netty/.pom.xml.swp b/drivers/netty/.pom.xml.swp new file mode 100644 index 0000000000..d57d9b18a0 Binary files /dev/null and b/drivers/netty/.pom.xml.swp differ diff --git a/drivers/netty/pom.xml b/drivers/netty/pom.xml new file mode 100644 index 0000000000..2fd2805a5a --- /dev/null +++ b/drivers/netty/pom.xml @@ -0,0 +1,82 @@ + + + + + 4.0.0 + + org.jclouds + jclouds-project + 1.0-SNAPSHOT + ../../project/pom.xml + + org.jclouds.driver + jclouds-netty + jclouds netty payload module + jclouds netty payload module + + + + + jclouds-sona-snapshots-nexus + https://oss.sonatype.org/content/repositories/snapshots + + false + + + true + + + + jboss-public-releases + https://repository.jboss.org/nexus/content/groups/public-jboss + + true + + + false + + + + + + + org.jclouds + jclouds-core + ${project.version} + + + org.jclouds + jclouds-core + ${project.version} + test-jar + test + + + org.jboss.netty + netty + 3.2.4.Final + provided + + + + diff --git a/drivers/netty/src/main/java/org/jclouds/netty/config/NettyPayloadModule.java b/drivers/netty/src/main/java/org/jclouds/netty/config/NettyPayloadModule.java new file mode 100644 index 0000000000..c0aedb5563 --- /dev/null +++ b/drivers/netty/src/main/java/org/jclouds/netty/config/NettyPayloadModule.java @@ -0,0 +1,20 @@ +package org.jclouds.netty.config; + +import org.jclouds.io.PayloadSlicer; +import org.jclouds.netty.io.NettyPayloadSlicer; + +import com.google.inject.AbstractModule; + +/** + * + * @author Adrian Cole + * + */ +public class NettyPayloadModule extends AbstractModule { + + @Override + protected void configure() { + bind(PayloadSlicer.class).to(NettyPayloadSlicer.class); + } + +} diff --git a/drivers/netty/src/main/java/org/jclouds/netty/io/ChunkedFileInputStream.java b/drivers/netty/src/main/java/org/jclouds/netty/io/ChunkedFileInputStream.java new file mode 100644 index 0000000000..73436f1195 --- /dev/null +++ b/drivers/netty/src/main/java/org/jclouds/netty/io/ChunkedFileInputStream.java @@ -0,0 +1,125 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.netty.io; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.handler.stream.ChunkedFile; + +/** + * + * + * + * + * @author Tibor Kiss + */ +public class ChunkedFileInputStream extends InputStream { + + private static final int CHUNK_SIZE = 8192; + + private ChunkedFile chunks; + private ChannelBuffer chunk; + + private IOException ex; + + public ChunkedFileInputStream(String filename, long offset, long length) { + this(new File(filename), offset, length); + } + + public ChunkedFileInputStream(File file, long offset, long length) { + try { + this.chunks = new ChunkedFile(new RandomAccessFile(file, "r"), offset, length, CHUNK_SIZE); + } catch (IOException ex) { + this.ex = ex; + } + } + + private ChannelBuffer getChunk() throws Exception { + if (ex != null) { + throw ex; + } + if (chunk == null) { + chunk = ChannelBuffer.class.cast(chunks.nextChunk()); + } + if (chunk != null) { + if (chunk.readableBytes() < 1 && chunks.hasNextChunk()) { + chunk = ChannelBuffer.class.cast(chunks.nextChunk()); + if (chunk.readableBytes() < 1) { + return null; + } + } + } else { + return null; + } + return chunk; + } + + @Override + public int read() throws IOException { + try { + ChannelBuffer chunk = getChunk(); + if (chunk == null) + return -1; + if (chunk.readableBytes() < 1) + return -1; + int readIndex = chunk.readerIndex(); + byte abyte = chunk.getByte(readIndex); + chunk.readerIndex(readIndex + 1); + return (int) abyte; + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + try { + ChannelBuffer chunk = getChunk(); + if (chunk == null) + return -1; + int readable = chunk.readableBytes(); + if (readable < 1) + return -1; + if (readable > len) { + readable = len; + } + int readIndex = chunk.readerIndex(); + chunk.getBytes(readIndex, b, off, readable); + chunk.readerIndex(readIndex + readable); + return readable; + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public void close() throws IOException { + try { + chunks.close(); + } catch (Exception e) { + throw new IOException(e); + } + } + +} diff --git a/drivers/netty/src/main/java/org/jclouds/netty/io/NettyPayloadSlicer.java b/drivers/netty/src/main/java/org/jclouds/netty/io/NettyPayloadSlicer.java new file mode 100644 index 0000000000..b130936d40 --- /dev/null +++ b/drivers/netty/src/main/java/org/jclouds/netty/io/NettyPayloadSlicer.java @@ -0,0 +1,42 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.netty.io; + +import java.io.File; + +import javax.inject.Singleton; + +import org.jclouds.io.Payload; +import org.jclouds.io.Payloads; +import org.jclouds.io.internal.BasePayloadSlicer; + +/** + * + * @author Adrian Cole + */ +@Singleton +public class NettyPayloadSlicer extends BasePayloadSlicer { + + @Override + protected Payload doSlice(File content, long offset, long length) { + return Payloads.newInputStreamPayload(new ChunkedFileInputStream(content, offset, length)); + } + +} \ No newline at end of file diff --git a/drivers/pom.xml b/drivers/pom.xml index e151791a72..a5d240a0da 100644 --- a/drivers/pom.xml +++ b/drivers/pom.xml @@ -38,6 +38,7 @@ bouncycastle log4j jsch + netty enterprise diff --git a/providers/aws-s3/pom.xml b/providers/aws-s3/pom.xml index 9fea31babb..48048825a6 100644 --- a/providers/aws-s3/pom.xml +++ b/providers/aws-s3/pom.xml @@ -42,6 +42,21 @@ ${test.aws.credential} + + + + jboss-public-releases + https://repository.jboss.org/nexus/content/groups/public-jboss + + true + + + false + + + + + org.jclouds.api @@ -94,6 +109,18 @@ 1.2.16 test + + org.jclouds.driver + jclouds-netty + ${project.version} + test + + + org.jboss.netty + netty + 3.2.4.Final + test + diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3ContextBuilder.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3ContextBuilder.java index 2a717f8eaf..bdb5d8a8bb 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3ContextBuilder.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3ContextBuilder.java @@ -22,6 +22,7 @@ package org.jclouds.aws.s3; import java.util.List; import java.util.Properties; +import org.jclouds.aws.s3.blobstore.config.AWSS3BlobStoreContextModule; import org.jclouds.aws.s3.config.AWSS3RestClientModule; import org.jclouds.s3.S3ContextBuilder; @@ -37,6 +38,11 @@ public class AWSS3ContextBuilder extends S3ContextBuilder { super(props); } + @Override + protected void addContextModule(List modules) { + modules.add(new AWSS3BlobStoreContextModule()); + } + @Override protected void addClientModule(List modules) { modules.add(new AWSS3RestClientModule()); diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java new file mode 100644 index 0000000000..7c83451d57 --- /dev/null +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java @@ -0,0 +1,82 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore; + +import java.util.Set; +import java.util.concurrent.ExecutorService; + +import javax.inject.Inject; +import javax.inject.Named; +import javax.inject.Provider; + +import org.jclouds.Constants; +import org.jclouds.aws.s3.AWSS3AsyncClient; +import org.jclouds.aws.s3.AWSS3Client; +import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; +import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.functions.BlobToHttpGetOptions; +import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata; +import org.jclouds.blobstore.util.BlobUtils; +import org.jclouds.collect.Memoized; +import org.jclouds.domain.Location; +import org.jclouds.s3.blobstore.S3AsyncBlobStore; +import org.jclouds.s3.blobstore.functions.BlobToObject; +import org.jclouds.s3.blobstore.functions.BucketToResourceList; +import org.jclouds.s3.blobstore.functions.BucketToResourceMetadata; +import org.jclouds.s3.blobstore.functions.ContainerToBucketListOptions; +import org.jclouds.s3.blobstore.functions.ObjectToBlob; +import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata; + +import com.google.common.base.Supplier; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +/** + * + * @author Tibor Kiss + */ +public class AWSS3AsyncBlobStore extends S3AsyncBlobStore { + + private final Provider multipartUploadStrategy; + + @Inject + public AWSS3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, + @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier defaultLocation, + @Memoized Supplier> locations, AWSS3AsyncClient async, AWSS3Client sync, + BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, + BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, + BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, + Provider fetchBlobMetadataProvider, + Provider multipartUploadStrategy) { + super(context, blobUtils, service, defaultLocation, locations, async, sync, bucket2ResourceMd, + container2BucketListOptions, bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object, + object2BlobMd, fetchBlobMetadataProvider); + this.multipartUploadStrategy = multipartUploadStrategy; + } + + @Override + public ListenableFuture putBlobMultipart(String container, Blob blob) { + // TODO: make this better + // need to use a provider if the strategy object is stateful + return Futures.immediateFuture(multipartUploadStrategy.get().execute(container, blob)); + } + +} diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java new file mode 100644 index 0000000000..7e3983b4a3 --- /dev/null +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java @@ -0,0 +1,73 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore; + +import java.util.Set; + +import javax.inject.Inject; +import javax.inject.Provider; + +import org.jclouds.aws.s3.AWSS3Client; +import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; +import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.functions.BlobToHttpGetOptions; +import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata; +import org.jclouds.blobstore.util.BlobUtils; +import org.jclouds.collect.Memoized; +import org.jclouds.domain.Location; +import org.jclouds.s3.blobstore.S3BlobStore; +import org.jclouds.s3.blobstore.functions.BlobToObject; +import org.jclouds.s3.blobstore.functions.BucketToResourceList; +import org.jclouds.s3.blobstore.functions.BucketToResourceMetadata; +import org.jclouds.s3.blobstore.functions.ContainerToBucketListOptions; +import org.jclouds.s3.blobstore.functions.ObjectToBlob; +import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata; + +import com.google.common.base.Supplier; + +/** + * Proived AWS S3 specific extensions. + * + * @author Tibor Kiss + */ +public class AWSS3BlobStore extends S3BlobStore { + + private final Provider multipartUploadStrategy; + + @Inject + AWSS3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier defaultLocation, + @Memoized Supplier> locations, AWSS3Client sync, + BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, + BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, + BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, + Provider fetchBlobMetadataProvider, Provider multipartUploadStrategy) { + super(context, blobUtils, defaultLocation, locations, sync, bucket2ResourceMd, container2BucketListOptions, + bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object, object2BlobMd, + fetchBlobMetadataProvider); + this.multipartUploadStrategy = multipartUploadStrategy; + } + + @Override + public String putBlobMultipart(String container, Blob blob) { + // need to use a provider if the strategy object is stateful + return multipartUploadStrategy.get().execute(container, blob); + } +} diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java new file mode 100644 index 0000000000..75cea06094 --- /dev/null +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java @@ -0,0 +1,58 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore.config; + +import org.jclouds.aws.s3.AWSS3AsyncClient; +import org.jclouds.aws.s3.AWSS3Client; +import org.jclouds.aws.s3.blobstore.AWSS3AsyncBlobStore; +import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; +import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; +import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy; +import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.blobstore.internal.BlobStoreContextImpl; +import org.jclouds.s3.blobstore.S3AsyncBlobStore; +import org.jclouds.s3.blobstore.S3BlobStore; +import org.jclouds.s3.blobstore.config.S3BlobStoreContextModule; + +import com.google.inject.Scopes; +import com.google.inject.TypeLiteral; + +/** + * + * + * @author Tibor Kiss + */ +public class AWSS3BlobStoreContextModule extends S3BlobStoreContextModule { + + @Override + protected void configure() { + super.configure(); + bind(S3AsyncBlobStore.class).to(AWSS3AsyncBlobStore.class).in(Scopes.SINGLETON); + bind(S3BlobStore.class).to(AWSS3BlobStore.class).in(Scopes.SINGLETON); + bind(MultipartUploadStrategy.class).to(SequentialMultipartUploadStrategy.class); + } + + @Override + protected void bindContext() { + bind(BlobStoreContext.class).to(new TypeLiteral>() { + }).in(Scopes.SINGLETON); + } + +} diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java new file mode 100644 index 0000000000..fe4a91bfa6 --- /dev/null +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java @@ -0,0 +1,50 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore.strategy; + +import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy; +import org.jclouds.blobstore.domain.Blob; + +import com.google.inject.ImplementedBy; + +/** + * @see + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore.strategy.internal; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Map; + +import javax.annotation.Resource; +import javax.inject.Inject; +import javax.inject.Named; + +import org.jclouds.aws.s3.AWSS3Client; +import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; +import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; +import org.jclouds.blobstore.KeyNotFoundException; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.reference.BlobStoreConstants; +import org.jclouds.io.Payload; +import org.jclouds.io.PayloadSlicer; +import org.jclouds.logging.Logger; +import org.jclouds.s3.domain.ObjectMetadataBuilder; + +import com.google.common.collect.Maps; + +/** + * Provides a sequential multipart upload strategy. + * + * The file partitioning algorithm: + * + * The default partition size we choose is 32mb. A multiple of this default partition size is used. + * The number of parts first grows to a chosen magnitude (for example 100 parts), then it grows the + * partition size instead of number of partitions. When we reached the maximum part size, then again + * it starts to grow the number of partitions. + * + * @author Tibor Kiss + */ +public class SequentialMultipartUploadStrategy implements MultipartUploadStrategy { + @Resource + @Named(BlobStoreConstants.BLOBSTORE_LOGGER) + protected Logger logger = Logger.NULL; + + private final long DEFAULT_PART_SIZE = 33554432; // 32mb + private final int MAGNITUDE_BASE = 100; + + private final AWSS3BlobStore ablobstore; + private final PayloadSlicer slicer; + + // calculated only once, but not from the constructor + private volatile long parts; // required number of parts with chunkSize + private volatile long chunkSize; + private volatile long remaining; // number of bytes remained for the last part + + // sequentially updated values + private volatile int part; + private volatile long chunkOffset; + private volatile long copied; + + @Inject + public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore, PayloadSlicer slicer) { + this.ablobstore = checkNotNull(ablobstore, "ablobstore"); + this.slicer = checkNotNull(slicer, "slicer"); + } + + protected long calculateChunkSize(long length) { + long unitPartSize = DEFAULT_PART_SIZE; // first try with default part size + long parts = length / unitPartSize; + long partSize = unitPartSize; + int magnitude = (int) (parts / MAGNITUDE_BASE); + if (magnitude > 0) { + partSize = magnitude * unitPartSize; + if (partSize > MAX_PART_SIZE) { + partSize = MAX_PART_SIZE; + unitPartSize = MAX_PART_SIZE; + } + parts = length / partSize; + } + if (parts > MAX_NUMBER_OF_PARTS) { // if splits in too many parts or + // cannot be split + unitPartSize = MIN_PART_SIZE; // take the minimum part size + parts = length / unitPartSize; + } + if (parts > MAX_NUMBER_OF_PARTS) { // if still splits in too many parts + parts = MAX_NUMBER_OF_PARTS - 1; // limit them. do not care about not + // covering + } + long remainder = length % unitPartSize; + if (remainder == 0 && parts > 0) { + parts -= 1; + } + this.chunkSize = partSize; + this.parts = parts; + this.remaining = length - partSize * parts; + logger.debug(" %d bytes partitioned in %d parts of part size: %d, remaining: %d%s", length, parts, chunkSize, + remaining, (remaining > MAX_PART_SIZE ? " overflow!" : "")); + return this.chunkSize; + } + + protected long getParts() { + return parts; + } + + protected int getNextPart() { + return ++part; + } + + protected void addCopied(long copied) { + this.copied += copied; + } + + protected long getNextChunkOffset() { + long next = chunkOffset; + chunkOffset += getChunkSize(); + return next; + } + + protected long getChunkSize() { + return chunkSize; + } + + protected long getRemaining() { + return remaining; + } + + private String prepareUploadPart(AWSS3Client client, String container, String key, String uploadId, int part, + Payload chunkedPart) { + String eTag = null; + try { + eTag = client.uploadPart(container, key, part, uploadId, chunkedPart); + } catch (KeyNotFoundException e) { + // note that because of eventual consistency, the upload id may not be present yet + // we may wish to add this condition to the retry handler + + // we may also choose to implement ListParts and wait for the uploadId to become + // available there. + eTag = client.uploadPart(container, key, part, uploadId, chunkedPart); + } + return eTag; + } + + @Override + public String execute(String container, Blob blob) { + String key = blob.getMetadata().getName(); + calculateChunkSize(blob.getPayload().getContentMetadata().getContentLength()); + long parts = getParts(); + if (parts > 0) { + AWSS3Client client = (AWSS3Client) ablobstore.getContext().getProviderSpecificContext().getApi(); + String uploadId = client.initiateMultipartUpload(container, ObjectMetadataBuilder.create().key(key).build()); // TODO + // md5 + Map etags = Maps.newHashMap(); + int part; + while ((part = getNextPart()) <= getParts()) { + String eTag = prepareUploadPart(client, container, key, uploadId, part, + slicer.slice(blob.getPayload(), getNextChunkOffset(), chunkSize)); + etags.put(new Integer(part), eTag); + } + long remaining = getRemaining(); + if (remaining > 0) { + String eTag = prepareUploadPart(client, container, key, uploadId, part, + slicer.slice(blob.getPayload(), getNextChunkOffset(), remaining)); + etags.put(new Integer(part), eTag); + } + return client.completeMultipartUpload(container, key, uploadId, etags); + } else { + return ablobstore.putBlob(container, blob); + } + } +} diff --git a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/AWSS3ClientLiveTest.java b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/AWSS3ClientLiveTest.java index 5e54c2f966..6833bda5b1 100644 --- a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/AWSS3ClientLiveTest.java +++ b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/AWSS3ClientLiveTest.java @@ -27,11 +27,15 @@ import static org.jclouds.io.Payloads.newByteArrayPayload; import static org.testng.Assert.assertEquals; import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.zip.GZIPInputStream; +import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.KeyNotFoundException; +import org.jclouds.blobstore.domain.Blob; import org.jclouds.http.BaseJettyTest; import org.jclouds.http.apachehc.config.ApacheHCHttpCommandExecutorServiceModule; import org.jclouds.io.Payload; @@ -43,6 +47,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.google.common.collect.ImmutableMap; +import com.google.common.io.ByteStreams; import com.google.common.io.InputSupplier; import com.google.inject.Module; @@ -134,4 +139,23 @@ public class AWSS3ClientLiveTest extends S3ClientLiveTest { returnContainer(containerName); } } + + public void testMultipartChunkedFileStream() throws IOException, InterruptedException { + + FileOutputStream fous = new FileOutputStream(new File("target/const.txt")); + ByteStreams.copy(oneHundredOneConstitutions.getInput(), fous); + fous.flush(); + fous.close(); + String containerName = getContainerName(); + + try { + BlobStore blobStore = context.getBlobStore(); + blobStore.createContainerInLocation(null, containerName); + Blob blob = blobStore.blobBuilder("const.txt") + .payload(new File("target/const.txt")).build(); + blobStore.putBlobMultipart(containerName, blob); + } finally { + returnContainer(containerName); + } + } } diff --git a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/integration/AWSS3TestInitializer.java b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/integration/AWSS3TestInitializer.java index 8478b31ba3..9f37f754c3 100644 --- a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/integration/AWSS3TestInitializer.java +++ b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/integration/AWSS3TestInitializer.java @@ -26,6 +26,7 @@ import org.jclouds.blobstore.BlobStoreContextFactory; import org.jclouds.blobstore.integration.TransientBlobStoreTestInitializer; import org.jclouds.blobstore.integration.internal.BaseBlobStoreIntegrationTest; import org.jclouds.logging.log4j.config.Log4JLoggingModule; +import org.jclouds.netty.config.NettyPayloadModule; import com.google.common.collect.ImmutableSet; import com.google.inject.Module; @@ -45,7 +46,7 @@ public class AWSS3TestInitializer extends TransientBlobStoreTestInitializer { protected BlobStoreContext createLiveContext(Module configurationModule, String endpoint, String apiversion, String app, String identity, String credential) throws IOException { return new BlobStoreContextFactory().createContext(provider, ImmutableSet.of(configurationModule, - new Log4JLoggingModule()), setupProperties(endpoint, apiversion, identity, credential)); + new Log4JLoggingModule(), new NettyPayloadModule()), setupProperties(endpoint, apiversion, identity, credential)); } } diff --git a/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureAsyncBlobStore.java b/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureAsyncBlobStore.java index 13f17e9368..780aa9274f 100644 --- a/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureAsyncBlobStore.java +++ b/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureAsyncBlobStore.java @@ -30,6 +30,7 @@ import javax.inject.Named; import javax.inject.Singleton; import org.jclouds.Constants; +import org.jclouds.azure.storage.domain.BoundedSet; import org.jclouds.azureblob.AzureBlobAsyncClient; import org.jclouds.azureblob.blobstore.functions.AzureBlobToBlob; import org.jclouds.azureblob.blobstore.functions.BlobPropertiesToBlobMetadata; @@ -42,7 +43,6 @@ import org.jclouds.azureblob.domain.BlobProperties; import org.jclouds.azureblob.domain.ContainerProperties; import org.jclouds.azureblob.domain.ListBlobsResponse; import org.jclouds.azureblob.options.ListBlobsOptions; -import org.jclouds.azure.storage.domain.BoundedSet; import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.domain.BlobMetadata; @@ -79,18 +79,18 @@ public class AzureAsyncBlobStore extends BaseAsyncBlobStore { @Inject AzureAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, - @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier defaultLocation, - @Memoized Supplier> locations, AzureBlobAsyncClient async, - ContainerToResourceMetadata container2ResourceMd, - ListOptionsToListBlobsOptions blobStore2AzureContainerListOptions, - ListBlobsResponseToResourceList azure2BlobStoreResourceList, AzureBlobToBlob azureBlob2Blob, - BlobToAzureBlob blob2AzureBlob, BlobPropertiesToBlobMetadata blob2BlobMd, - BlobToHttpGetOptions blob2ObjectGetOptions) { + @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier defaultLocation, + @Memoized Supplier> locations, AzureBlobAsyncClient async, + ContainerToResourceMetadata container2ResourceMd, + ListOptionsToListBlobsOptions blobStore2AzureContainerListOptions, + ListBlobsResponseToResourceList azure2BlobStoreResourceList, AzureBlobToBlob azureBlob2Blob, + BlobToAzureBlob blob2AzureBlob, BlobPropertiesToBlobMetadata blob2BlobMd, + BlobToHttpGetOptions blob2ObjectGetOptions) { super(context, blobUtils, service, defaultLocation, locations); this.async = checkNotNull(async, "async"); this.container2ResourceMd = checkNotNull(container2ResourceMd, "container2ResourceMd"); this.blobStore2AzureContainerListOptions = checkNotNull(blobStore2AzureContainerListOptions, - "blobStore2AzureContainerListOptions"); + "blobStore2AzureContainerListOptions"); this.azure2BlobStoreResourceList = checkNotNull(azure2BlobStoreResourceList, "azure2BlobStoreResourceList"); this.azureBlob2Blob = checkNotNull(azureBlob2Blob, "azureBlob2Blob"); this.blob2AzureBlob = checkNotNull(blob2AzureBlob, "blob2AzureBlob"); @@ -105,15 +105,15 @@ public class AzureAsyncBlobStore extends BaseAsyncBlobStore { @Override public ListenableFuture> list() { return Futures - .compose( - async.listContainers(includeMetadata()), - new Function, org.jclouds.blobstore.domain.PageSet>() { - public org.jclouds.blobstore.domain.PageSet apply( - BoundedSet from) { - return new PageSetImpl(Iterables.transform(from, container2ResourceMd), - from.getNextMarker()); - } - }, service); + .compose( + async.listContainers(includeMetadata()), + new Function, org.jclouds.blobstore.domain.PageSet>() { + public org.jclouds.blobstore.domain.PageSet apply( + BoundedSet from) { + return new PageSetImpl(Iterables.transform(from, container2ResourceMd), from + .getNextMarker()); + } + }, service); } /** @@ -243,4 +243,9 @@ public class AzureAsyncBlobStore extends BaseAsyncBlobStore { throw new UnsupportedOperationException("please use deleteContainer"); } + @Override + public ListenableFuture putBlobMultipart(String container, Blob blob) { + return putBlob(container, blob); + } + } diff --git a/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureBlobStore.java b/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureBlobStore.java index 3764648c31..88b0a11f19 100644 --- a/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureBlobStore.java +++ b/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureBlobStore.java @@ -192,6 +192,19 @@ public class AzureBlobStore extends BaseBlobStore { return sync.putBlob(container, blob2AzureBlob.apply(blob)); } + /** + * This implementation invokes {@link AzureBlobClient#putObject} + * + * @param container + * container name + * @param blob + * object + */ + @Override + public String putBlobMultipart(String container, Blob blob) { + return putBlob(container, blob); + } + /** * This implementation invokes {@link AzureBlobClient#deleteObject} *