Merge branch 'large-blob' of git@github.com:jclouds/jclouds

* 'large-blob' of git@github.com:jclouds/jclouds:
  Issue 486: moved netty to a driver and created a base payload slicer
  AWS S3 sequential multipart upload strategy, newly added files.
  AWS S3 sequential Multipart Upload strategy
This commit is contained in:
Adrian Cole 2011-03-09 13:42:02 -08:00
commit 2b43975cc8
31 changed files with 1173 additions and 60 deletions

View File

@ -250,4 +250,9 @@ public class AtmosAsyncBlobStore extends BaseAsyncBlobStore {
return async.deletePath(container + "/" + key);
}
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
}

View File

@ -207,6 +207,16 @@ public class AtmosBlobStore extends BaseBlobStore {
return AtmosUtils.putBlob(sync, crypto, blob2Object, container, blob);
}
/**
* This implementation invokes {@link AtmosClient#createFile}
* <p/>
* Since there is no etag support in atmos, we just return the path.
*/
@Override
public String putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
/**
* This implementation invokes {@link AtmosClient#deletePath}
*/

View File

@ -660,4 +660,10 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
String eTag = CryptoStreams.hex(object.getPayload().getContentMetadata().getContentMD5());
return eTag;
}
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
}

View File

@ -83,7 +83,7 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore {
private final Provider<FetchBlobMetadata> fetchBlobMetadataProvider;
@Inject
S3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
protected S3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, S3AsyncClient async, S3Client sync,
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
@ -238,4 +238,9 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore {
return async.deleteObject(container, key);
}
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
}

View File

@ -74,7 +74,7 @@ public class S3BlobStore extends BaseBlobStore {
private final Provider<FetchBlobMetadata> fetchBlobMetadataProvider;
@Inject
S3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
protected S3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, S3Client sync,
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
@ -225,6 +225,19 @@ public class S3BlobStore extends BaseBlobStore {
return sync.putObject(container, blob2Object.apply(blob));
}
/**
* This implementation invokes {@link S3Client#putObject}
*
* @param container
* bucket name
* @param blob
* object
*/
@Override
public String putBlobMultipart(String container, Blob blob) {
return sync.putObject(container, blob2Object.apply(blob));
}
/**
* This implementation invokes {@link S3Client#deleteObject}
*

View File

@ -55,12 +55,16 @@ public class S3BlobStoreContextModule extends AbstractModule {
bind(ConsistencyModel.class).toInstance(ConsistencyModel.EVENTUAL);
bind(AsyncBlobStore.class).to(S3AsyncBlobStore.class).in(Scopes.SINGLETON);
bind(BlobStore.class).to(S3BlobStore.class).in(Scopes.SINGLETON);
bind(BlobStoreContext.class).to(new TypeLiteral<BlobStoreContextImpl<S3Client, S3AsyncClient>>() {
}).in(Scopes.SINGLETON);
bindContext();
bind(BlobRequestSigner.class).to(S3BlobRequestSigner.class);
bindBucketLocationStrategy();
}
protected void bindContext() {
bind(BlobStoreContext.class).to(new TypeLiteral<BlobStoreContextImpl<S3Client, S3AsyncClient>>() {
}).in(Scopes.SINGLETON);
}
protected void bindBucketLocationStrategy() {
bind(new TypeLiteral<Function<BucketMetadata, Location>>() {
}).to(LocationFromBucketLocation.class);

View File

@ -110,8 +110,7 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
public ListenableFuture<PageSet<? extends StorageMetadata>> list() {
return Futures.compose(async.listContainers(),
new Function<Set<ContainerMetadata>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() {
public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(
Set<ContainerMetadata> from) {
public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(Set<ContainerMetadata> from) {
return new PageSetImpl<StorageMetadata>(Iterables.transform(from, container2ResourceMd), null);
}
}, service);
@ -235,4 +234,9 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
return !sync.containerExists(container);
}
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
}

View File

@ -195,6 +195,19 @@ public class SwiftBlobStore extends BaseBlobStore {
return sync.putObject(container, blob2Object.apply(blob));
}
/**
* This implementation invokes {@link CommonSwiftClient#putObject}
*
* @param container
* container name
* @param blob
* object
*/
@Override
public String putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
/**
* This implementation invokes {@link CommonSwiftClient#removeObject}
*

View File

@ -32,6 +32,7 @@ import org.jclouds.blobstore.options.GetOptions;
import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.domain.Location;
import com.google.common.annotations.Beta;
import com.google.common.util.concurrent.ListenableFuture;
/**
@ -126,6 +127,12 @@ public interface AsyncBlobStore {
*/
ListenableFuture<String> putBlob(String container, Blob blob);
/**
* @see BlobStore#putBlobMultipart
*/
@Beta
ListenableFuture<String> putBlobMultipart(String container, Blob blob);
/**
* @see BlobStore#blobMetadata
*/

View File

@ -32,6 +32,8 @@ import org.jclouds.blobstore.options.GetOptions;
import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.domain.Location;
import com.google.common.annotations.Beta;
/**
* Synchronous access to a BlobStore such as Amazon S3
*
@ -48,6 +50,7 @@ public interface BlobStore {
/**
* creates a new blob with the specified name.
*
* @see #blobBuilder
*/
@Deprecated
@ -200,6 +203,23 @@ public interface BlobStore {
*/
String putBlob(String container, Blob blob);
/**
* Adds a {@code Blob} representing the data at location {@code container/blob.metadata.name}
* using multipart strategies.
*
* @param container
* container to place the blob.
* @param blob
* fully qualified name relative to the container.
* @param options
* byte range or condition options
* @return etag of the blob you uploaded, possibly null where etags are unsupported
* @throws ContainerNotFoundException
* if the container doesn't exist
*/
@Beta
String putBlobMultipart(String container, Blob blob);
/**
* Retrieves the metadata of a {@code Blob} at location {@code container/name}
*

View File

@ -675,4 +675,10 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
return containerToLocation;
}
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
// TODO implement
return putBlob(container, blob);
}
}

View File

@ -0,0 +1,47 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.io;
import org.jclouds.io.internal.BasePayloadSlicer;
import com.google.inject.ImplementedBy;
/**
*
* @author Adrian Cole
*/
@ImplementedBy(BasePayloadSlicer.class)
public interface PayloadSlicer {
/**
* Returns a {@link Payload} that returns input streams from the an underlying payload, where
* each stream starts at the given offset and is limited to the specified number of bytes.
*
* @param input
* the payload from which to get the raw streams
* @param offset
* the offset in bytes into the underlying stream where the returned streams will start
* @param length
* the maximum length of the returned streams
* @throws IllegalArgumentException
* if offset or length are negative
*/
Payload slice(Payload input, long offset, long length);
}

View File

@ -0,0 +1,104 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.io.internal;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import javax.inject.Singleton;
import org.jclouds.io.InputSuppliers;
import org.jclouds.io.Payload;
import org.jclouds.io.PayloadSlicer;
import org.jclouds.io.payloads.BaseMutableContentMetadata;
import org.jclouds.io.payloads.InputStreamSupplierPayload;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
/**
*
* @author Adrian Cole
*/
@Singleton
public class BasePayloadSlicer implements PayloadSlicer {
/**
* {@inheritDoc}
*/
@Override
public Payload slice(Payload input, long offset, long length) {
checkNotNull(input);
checkArgument(offset >= 0, "offset is negative");
checkArgument(length >= 0, "length is negative");
Payload returnVal;
if (input.getRawContent() instanceof File) {
returnVal = doSlice((File) input.getRawContent(), offset, length);
} else if (input.getRawContent() instanceof String) {
returnVal = doSlice((byte[]) input.getRawContent(), offset, length);
} else if (input.getRawContent() instanceof byte[]) {
returnVal = doSlice((byte[]) input.getRawContent(), offset, length);
} else {
returnVal = doSlice(input.getInput(), offset, length);
}
return copyMetadataAndSetLength(input, returnVal, length);
}
protected Payload doSlice(Payload content, long offset, long length) {
return new InputStreamSupplierPayload(ByteStreams.slice(content, offset, length));
}
protected Payload doSlice(String content, long offset, long length) {
return doSlice(content.getBytes(), offset, length);
}
protected Payload doSlice(File content, long offset, long length) {
try {
return doSlice(new FileInputStream(content), offset, length);
} catch (FileNotFoundException e) {
Throwables.propagate(e);
return null;
}
}
protected Payload doSlice(InputStream content, long offset, long length) {
return new InputStreamSupplierPayload(ByteStreams.slice(InputSuppliers.of(content), offset, length));
}
protected Payload doSlice(byte[] content, long offset, long length) {
Payload returnVal;
checkArgument(offset <= Integer.MAX_VALUE, "offset is too big for an array");
checkArgument(length <= Integer.MAX_VALUE, "length is too big for an array");
returnVal = new InputStreamSupplierPayload(
ByteStreams.newInputStreamSupplier(content, (int) offset, (int) length));
return returnVal;
}
protected Payload copyMetadataAndSetLength(Payload input, Payload returnVal, long length) {
returnVal.setContentMetadata(BaseMutableContentMetadata.fromContentMetadata(input.getContentMetadata()
.toBuilder().contentLength(length).contentMD5(null).build()));
return returnVal;
}
}

View File

@ -0,0 +1,75 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.io.payloads;
import static com.google.common.io.Closeables.closeQuietly;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.io.InputSupplier;
/**
* @author Adrian Cole
*/
public class InputStreamSupplierPayload extends BasePayload<InputSupplier<? extends InputStream>> {
private List<InputStream> toClose = Lists.newArrayList();
public InputStreamSupplierPayload(InputSupplier<? extends InputStream> content) {
super(content);
}
/**
* {@inheritDoc}
*/
@Override
public InputStream getInput() {
try {
InputStream returnVal = content.getInput();
toClose.add(returnVal);
return returnVal;
} catch (IOException e) {
Throwables.propagate(e);
return null;
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean isRepeatable() {
return true;
}
/**
* if we created the stream, then it is already consumed on close.
*/
@Override
public void release() {
if (toClose.size() > 0)
for (InputStream content = toClose.remove(0); toClose.size() > 0; content = toClose.remove(0))
closeQuietly(content);
}
}

BIN
drivers/netty/.pom.xml.swp Normal file

Binary file not shown.

82
drivers/netty/pom.xml Normal file
View File

@ -0,0 +1,82 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
====================================================================
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
====================================================================
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.jclouds</groupId>
<artifactId>jclouds-project</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../project/pom.xml</relativePath>
</parent>
<groupId>org.jclouds.driver</groupId>
<artifactId>jclouds-netty</artifactId>
<name>jclouds netty payload module</name>
<description>jclouds netty payload module</description>
<!-- bootstrapping: need to fetch the project POM -->
<repositories>
<repository>
<id>jclouds-sona-snapshots-nexus</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>jboss-public-releases</id>
<url>https://repository.jboss.org/nexus/content/groups/public-jboss</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.jclouds</groupId>
<artifactId>jclouds-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.jclouds</groupId>
<artifactId>jclouds-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.4.Final</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,20 @@
package org.jclouds.netty.config;
import org.jclouds.io.PayloadSlicer;
import org.jclouds.netty.io.NettyPayloadSlicer;
import com.google.inject.AbstractModule;
/**
*
* @author Adrian Cole
*
*/
public class NettyPayloadModule extends AbstractModule {
@Override
protected void configure() {
bind(PayloadSlicer.class).to(NettyPayloadSlicer.class);
}
}

View File

@ -0,0 +1,125 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.netty.io;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.stream.ChunkedFile;
/**
*
*
*
*
* @author Tibor Kiss
*/
public class ChunkedFileInputStream extends InputStream {
private static final int CHUNK_SIZE = 8192;
private ChunkedFile chunks;
private ChannelBuffer chunk;
private IOException ex;
public ChunkedFileInputStream(String filename, long offset, long length) {
this(new File(filename), offset, length);
}
public ChunkedFileInputStream(File file, long offset, long length) {
try {
this.chunks = new ChunkedFile(new RandomAccessFile(file, "r"), offset, length, CHUNK_SIZE);
} catch (IOException ex) {
this.ex = ex;
}
}
private ChannelBuffer getChunk() throws Exception {
if (ex != null) {
throw ex;
}
if (chunk == null) {
chunk = ChannelBuffer.class.cast(chunks.nextChunk());
}
if (chunk != null) {
if (chunk.readableBytes() < 1 && chunks.hasNextChunk()) {
chunk = ChannelBuffer.class.cast(chunks.nextChunk());
if (chunk.readableBytes() < 1) {
return null;
}
}
} else {
return null;
}
return chunk;
}
@Override
public int read() throws IOException {
try {
ChannelBuffer chunk = getChunk();
if (chunk == null)
return -1;
if (chunk.readableBytes() < 1)
return -1;
int readIndex = chunk.readerIndex();
byte abyte = chunk.getByte(readIndex);
chunk.readerIndex(readIndex + 1);
return (int) abyte;
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
try {
ChannelBuffer chunk = getChunk();
if (chunk == null)
return -1;
int readable = chunk.readableBytes();
if (readable < 1)
return -1;
if (readable > len) {
readable = len;
}
int readIndex = chunk.readerIndex();
chunk.getBytes(readIndex, b, off, readable);
chunk.readerIndex(readIndex + readable);
return readable;
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public void close() throws IOException {
try {
chunks.close();
} catch (Exception e) {
throw new IOException(e);
}
}
}

View File

@ -0,0 +1,42 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.netty.io;
import java.io.File;
import javax.inject.Singleton;
import org.jclouds.io.Payload;
import org.jclouds.io.Payloads;
import org.jclouds.io.internal.BasePayloadSlicer;
/**
*
* @author Adrian Cole
*/
@Singleton
public class NettyPayloadSlicer extends BasePayloadSlicer {
@Override
protected Payload doSlice(File content, long offset, long length) {
return Payloads.newInputStreamPayload(new ChunkedFileInputStream(content, offset, length));
}
}

View File

@ -38,6 +38,7 @@
<module>bouncycastle</module>
<module>log4j</module>
<module>jsch</module>
<module>netty</module>
<module>enterprise</module>
</modules>
</project>

View File

@ -42,6 +42,21 @@
<test.aws-s3.credential>${test.aws.credential}</test.aws-s3.credential>
</properties>
<!-- temporary -->
<repositories>
<repository>
<id>jboss-public-releases</id>
<url>https://repository.jboss.org/nexus/content/groups/public-jboss</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.jclouds.api</groupId>
@ -94,6 +109,18 @@
<version>1.2.16</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jclouds.driver</groupId>
<artifactId>jclouds-netty</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.4.Final</version>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>

View File

@ -22,6 +22,7 @@ package org.jclouds.aws.s3;
import java.util.List;
import java.util.Properties;
import org.jclouds.aws.s3.blobstore.config.AWSS3BlobStoreContextModule;
import org.jclouds.aws.s3.config.AWSS3RestClientModule;
import org.jclouds.s3.S3ContextBuilder;
@ -37,6 +38,11 @@ public class AWSS3ContextBuilder extends S3ContextBuilder {
super(props);
}
@Override
protected void addContextModule(List<Module> modules) {
modules.add(new AWSS3BlobStoreContextModule());
}
@Override
protected void addClientModule(List<Module> modules) {
modules.add(new AWSS3RestClientModule());

View File

@ -0,0 +1,82 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.aws.s3.blobstore;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Provider;
import org.jclouds.Constants;
import org.jclouds.aws.s3.AWSS3AsyncClient;
import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata;
import org.jclouds.blobstore.util.BlobUtils;
import org.jclouds.collect.Memoized;
import org.jclouds.domain.Location;
import org.jclouds.s3.blobstore.S3AsyncBlobStore;
import org.jclouds.s3.blobstore.functions.BlobToObject;
import org.jclouds.s3.blobstore.functions.BucketToResourceList;
import org.jclouds.s3.blobstore.functions.BucketToResourceMetadata;
import org.jclouds.s3.blobstore.functions.ContainerToBucketListOptions;
import org.jclouds.s3.blobstore.functions.ObjectToBlob;
import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
/**
*
* @author Tibor Kiss
*/
public class AWSS3AsyncBlobStore extends S3AsyncBlobStore {
private final Provider<MultipartUploadStrategy> multipartUploadStrategy;
@Inject
public AWSS3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, AWSS3AsyncClient async, AWSS3Client sync,
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
Provider<FetchBlobMetadata> fetchBlobMetadataProvider,
Provider<MultipartUploadStrategy> multipartUploadStrategy) {
super(context, blobUtils, service, defaultLocation, locations, async, sync, bucket2ResourceMd,
container2BucketListOptions, bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object,
object2BlobMd, fetchBlobMetadataProvider);
this.multipartUploadStrategy = multipartUploadStrategy;
}
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
// TODO: make this better
// need to use a provider if the strategy object is stateful
return Futures.immediateFuture(multipartUploadStrategy.get().execute(container, blob));
}
}

View File

@ -0,0 +1,73 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.aws.s3.blobstore;
import java.util.Set;
import javax.inject.Inject;
import javax.inject.Provider;
import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata;
import org.jclouds.blobstore.util.BlobUtils;
import org.jclouds.collect.Memoized;
import org.jclouds.domain.Location;
import org.jclouds.s3.blobstore.S3BlobStore;
import org.jclouds.s3.blobstore.functions.BlobToObject;
import org.jclouds.s3.blobstore.functions.BucketToResourceList;
import org.jclouds.s3.blobstore.functions.BucketToResourceMetadata;
import org.jclouds.s3.blobstore.functions.ContainerToBucketListOptions;
import org.jclouds.s3.blobstore.functions.ObjectToBlob;
import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata;
import com.google.common.base.Supplier;
/**
* Proived AWS S3 specific extensions.
*
* @author Tibor Kiss
*/
public class AWSS3BlobStore extends S3BlobStore {
private final Provider<MultipartUploadStrategy> multipartUploadStrategy;
@Inject
AWSS3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, AWSS3Client sync,
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
Provider<FetchBlobMetadata> fetchBlobMetadataProvider, Provider<MultipartUploadStrategy> multipartUploadStrategy) {
super(context, blobUtils, defaultLocation, locations, sync, bucket2ResourceMd, container2BucketListOptions,
bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object, object2BlobMd,
fetchBlobMetadataProvider);
this.multipartUploadStrategy = multipartUploadStrategy;
}
@Override
public String putBlobMultipart(String container, Blob blob) {
// need to use a provider if the strategy object is stateful
return multipartUploadStrategy.get().execute(container, blob);
}
}

View File

@ -0,0 +1,58 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.aws.s3.blobstore.config;
import org.jclouds.aws.s3.AWSS3AsyncClient;
import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.AWSS3AsyncBlobStore;
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.internal.BlobStoreContextImpl;
import org.jclouds.s3.blobstore.S3AsyncBlobStore;
import org.jclouds.s3.blobstore.S3BlobStore;
import org.jclouds.s3.blobstore.config.S3BlobStoreContextModule;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
/**
*
*
* @author Tibor Kiss
*/
public class AWSS3BlobStoreContextModule extends S3BlobStoreContextModule {
@Override
protected void configure() {
super.configure();
bind(S3AsyncBlobStore.class).to(AWSS3AsyncBlobStore.class).in(Scopes.SINGLETON);
bind(S3BlobStore.class).to(AWSS3BlobStore.class).in(Scopes.SINGLETON);
bind(MultipartUploadStrategy.class).to(SequentialMultipartUploadStrategy.class);
}
@Override
protected void bindContext() {
bind(BlobStoreContext.class).to(new TypeLiteral<BlobStoreContextImpl<AWSS3Client, AWSS3AsyncClient>>() {
}).in(Scopes.SINGLETON);
}
}

View File

@ -0,0 +1,50 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.aws.s3.blobstore.strategy;
import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy;
import org.jclouds.blobstore.domain.Blob;
import com.google.inject.ImplementedBy;
/**
* @see <a href="http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?qfacts.html"
*
* @author Tibor Kiss
*/
@ImplementedBy(SequentialMultipartUploadStrategy.class)
public interface MultipartUploadStrategy {
/* Maximum number of parts per upload */
public static final int MAX_NUMBER_OF_PARTS = 10000;
/* Maximum number of parts returned for a list parts request */
public static final int MAX_LIST_PARTS_RETURNED = 1000;
/* Maximum number of multipart uploads returned in a list multipart uploads request */
public static final int MAX_LIST_MPU_RETURNED = 1000;
/*
* part size 5 MB to 5 GB, last part can be < 5 MB
*/
public static final long MIN_PART_SIZE = 5242880L;
public static final long MAX_PART_SIZE = 5368709120L;
String execute(String container, Blob blob);
}

View File

@ -0,0 +1,185 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.aws.s3.blobstore.strategy.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Map;
import javax.annotation.Resource;
import javax.inject.Inject;
import javax.inject.Named;
import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.blobstore.KeyNotFoundException;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.reference.BlobStoreConstants;
import org.jclouds.io.Payload;
import org.jclouds.io.PayloadSlicer;
import org.jclouds.logging.Logger;
import org.jclouds.s3.domain.ObjectMetadataBuilder;
import com.google.common.collect.Maps;
/**
* Provides a sequential multipart upload strategy.
*
* The file partitioning algorithm:
*
* The default partition size we choose is 32mb. A multiple of this default partition size is used.
* The number of parts first grows to a chosen magnitude (for example 100 parts), then it grows the
* partition size instead of number of partitions. When we reached the maximum part size, then again
* it starts to grow the number of partitions.
*
* @author Tibor Kiss
*/
public class SequentialMultipartUploadStrategy implements MultipartUploadStrategy {
@Resource
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
protected Logger logger = Logger.NULL;
private final long DEFAULT_PART_SIZE = 33554432; // 32mb
private final int MAGNITUDE_BASE = 100;
private final AWSS3BlobStore ablobstore;
private final PayloadSlicer slicer;
// calculated only once, but not from the constructor
private volatile long parts; // required number of parts with chunkSize
private volatile long chunkSize;
private volatile long remaining; // number of bytes remained for the last part
// sequentially updated values
private volatile int part;
private volatile long chunkOffset;
private volatile long copied;
@Inject
public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore, PayloadSlicer slicer) {
this.ablobstore = checkNotNull(ablobstore, "ablobstore");
this.slicer = checkNotNull(slicer, "slicer");
}
protected long calculateChunkSize(long length) {
long unitPartSize = DEFAULT_PART_SIZE; // first try with default part size
long parts = length / unitPartSize;
long partSize = unitPartSize;
int magnitude = (int) (parts / MAGNITUDE_BASE);
if (magnitude > 0) {
partSize = magnitude * unitPartSize;
if (partSize > MAX_PART_SIZE) {
partSize = MAX_PART_SIZE;
unitPartSize = MAX_PART_SIZE;
}
parts = length / partSize;
}
if (parts > MAX_NUMBER_OF_PARTS) { // if splits in too many parts or
// cannot be split
unitPartSize = MIN_PART_SIZE; // take the minimum part size
parts = length / unitPartSize;
}
if (parts > MAX_NUMBER_OF_PARTS) { // if still splits in too many parts
parts = MAX_NUMBER_OF_PARTS - 1; // limit them. do not care about not
// covering
}
long remainder = length % unitPartSize;
if (remainder == 0 && parts > 0) {
parts -= 1;
}
this.chunkSize = partSize;
this.parts = parts;
this.remaining = length - partSize * parts;
logger.debug(" %d bytes partitioned in %d parts of part size: %d, remaining: %d%s", length, parts, chunkSize,
remaining, (remaining > MAX_PART_SIZE ? " overflow!" : ""));
return this.chunkSize;
}
protected long getParts() {
return parts;
}
protected int getNextPart() {
return ++part;
}
protected void addCopied(long copied) {
this.copied += copied;
}
protected long getNextChunkOffset() {
long next = chunkOffset;
chunkOffset += getChunkSize();
return next;
}
protected long getChunkSize() {
return chunkSize;
}
protected long getRemaining() {
return remaining;
}
private String prepareUploadPart(AWSS3Client client, String container, String key, String uploadId, int part,
Payload chunkedPart) {
String eTag = null;
try {
eTag = client.uploadPart(container, key, part, uploadId, chunkedPart);
} catch (KeyNotFoundException e) {
// note that because of eventual consistency, the upload id may not be present yet
// we may wish to add this condition to the retry handler
// we may also choose to implement ListParts and wait for the uploadId to become
// available there.
eTag = client.uploadPart(container, key, part, uploadId, chunkedPart);
}
return eTag;
}
@Override
public String execute(String container, Blob blob) {
String key = blob.getMetadata().getName();
calculateChunkSize(blob.getPayload().getContentMetadata().getContentLength());
long parts = getParts();
if (parts > 0) {
AWSS3Client client = (AWSS3Client) ablobstore.getContext().getProviderSpecificContext().getApi();
String uploadId = client.initiateMultipartUpload(container, ObjectMetadataBuilder.create().key(key).build()); // TODO
// md5
Map<Integer, String> etags = Maps.newHashMap();
int part;
while ((part = getNextPart()) <= getParts()) {
String eTag = prepareUploadPart(client, container, key, uploadId, part,
slicer.slice(blob.getPayload(), getNextChunkOffset(), chunkSize));
etags.put(new Integer(part), eTag);
}
long remaining = getRemaining();
if (remaining > 0) {
String eTag = prepareUploadPart(client, container, key, uploadId, part,
slicer.slice(blob.getPayload(), getNextChunkOffset(), remaining));
etags.put(new Integer(part), eTag);
}
return client.completeMultipartUpload(container, key, uploadId, etags);
} else {
return ablobstore.putBlob(container, blob);
}
}
}

View File

@ -27,11 +27,15 @@ import static org.jclouds.io.Payloads.newByteArrayPayload;
import static org.testng.Assert.assertEquals;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.zip.GZIPInputStream;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.KeyNotFoundException;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.http.BaseJettyTest;
import org.jclouds.http.apachehc.config.ApacheHCHttpCommandExecutorServiceModule;
import org.jclouds.io.Payload;
@ -43,6 +47,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.InputSupplier;
import com.google.inject.Module;
@ -134,4 +139,23 @@ public class AWSS3ClientLiveTest extends S3ClientLiveTest {
returnContainer(containerName);
}
}
public void testMultipartChunkedFileStream() throws IOException, InterruptedException {
FileOutputStream fous = new FileOutputStream(new File("target/const.txt"));
ByteStreams.copy(oneHundredOneConstitutions.getInput(), fous);
fous.flush();
fous.close();
String containerName = getContainerName();
try {
BlobStore blobStore = context.getBlobStore();
blobStore.createContainerInLocation(null, containerName);
Blob blob = blobStore.blobBuilder("const.txt")
.payload(new File("target/const.txt")).build();
blobStore.putBlobMultipart(containerName, blob);
} finally {
returnContainer(containerName);
}
}
}

View File

@ -26,6 +26,7 @@ import org.jclouds.blobstore.BlobStoreContextFactory;
import org.jclouds.blobstore.integration.TransientBlobStoreTestInitializer;
import org.jclouds.blobstore.integration.internal.BaseBlobStoreIntegrationTest;
import org.jclouds.logging.log4j.config.Log4JLoggingModule;
import org.jclouds.netty.config.NettyPayloadModule;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Module;
@ -45,7 +46,7 @@ public class AWSS3TestInitializer extends TransientBlobStoreTestInitializer {
protected BlobStoreContext createLiveContext(Module configurationModule, String endpoint, String apiversion,
String app, String identity, String credential) throws IOException {
return new BlobStoreContextFactory().createContext(provider, ImmutableSet.of(configurationModule,
new Log4JLoggingModule()), setupProperties(endpoint, apiversion, identity, credential));
new Log4JLoggingModule(), new NettyPayloadModule()), setupProperties(endpoint, apiversion, identity, credential));
}
}

View File

@ -30,6 +30,7 @@ import javax.inject.Named;
import javax.inject.Singleton;
import org.jclouds.Constants;
import org.jclouds.azure.storage.domain.BoundedSet;
import org.jclouds.azureblob.AzureBlobAsyncClient;
import org.jclouds.azureblob.blobstore.functions.AzureBlobToBlob;
import org.jclouds.azureblob.blobstore.functions.BlobPropertiesToBlobMetadata;
@ -42,7 +43,6 @@ import org.jclouds.azureblob.domain.BlobProperties;
import org.jclouds.azureblob.domain.ContainerProperties;
import org.jclouds.azureblob.domain.ListBlobsResponse;
import org.jclouds.azureblob.options.ListBlobsOptions;
import org.jclouds.azure.storage.domain.BoundedSet;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.BlobMetadata;
@ -110,8 +110,8 @@ public class AzureAsyncBlobStore extends BaseAsyncBlobStore {
new Function<BoundedSet<ContainerProperties>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() {
public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(
BoundedSet<ContainerProperties> from) {
return new PageSetImpl<StorageMetadata>(Iterables.transform(from, container2ResourceMd),
from.getNextMarker());
return new PageSetImpl<StorageMetadata>(Iterables.transform(from, container2ResourceMd), from
.getNextMarker());
}
}, service);
}
@ -243,4 +243,9 @@ public class AzureAsyncBlobStore extends BaseAsyncBlobStore {
throw new UnsupportedOperationException("please use deleteContainer");
}
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
}

View File

@ -192,6 +192,19 @@ public class AzureBlobStore extends BaseBlobStore {
return sync.putBlob(container, blob2AzureBlob.apply(blob));
}
/**
* This implementation invokes {@link AzureBlobClient#putObject}
*
* @param container
* container name
* @param blob
* object
*/
@Override
public String putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
/**
* This implementation invokes {@link AzureBlobClient#deleteObject}
*