Merge branch 'large-blob' of https://github.com/tiborkiss/jclouds into tiborkiss-large-blob

* 'large-blob' of https://github.com/tiborkiss/jclouds:
  AWS S3 sequential multipart upload strategy, newly added files.
  AWS S3 sequential Multipart Upload strategy
This commit is contained in:
Adrian Cole 2011-03-05 21:16:07 -05:00
commit 16843d9a92
17 changed files with 721 additions and 2 deletions

View File

@ -207,6 +207,16 @@ public class AtmosBlobStore extends BaseBlobStore {
return AtmosUtils.putBlob(sync, crypto, blob2Object, container, blob); return AtmosUtils.putBlob(sync, crypto, blob2Object, container, blob);
} }
/**
* This implementation invokes {@link AtmosClient#createFile}
* <p/>
* Since there is no etag support in atmos, we just return the path.
*/
@Override
public String putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
/** /**
* This implementation invokes {@link AtmosClient#deletePath} * This implementation invokes {@link AtmosClient#deletePath}
*/ */

View File

@ -83,7 +83,7 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore {
private final Provider<FetchBlobMetadata> fetchBlobMetadataProvider; private final Provider<FetchBlobMetadata> fetchBlobMetadataProvider;
@Inject @Inject
S3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, protected S3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation, @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, S3AsyncClient async, S3Client sync, @Memoized Supplier<Set<? extends Location>> locations, S3AsyncClient async, S3Client sync,
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,

View File

@ -74,7 +74,7 @@ public class S3BlobStore extends BaseBlobStore {
private final Provider<FetchBlobMetadata> fetchBlobMetadataProvider; private final Provider<FetchBlobMetadata> fetchBlobMetadataProvider;
@Inject @Inject
S3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation, protected S3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, S3Client sync, @Memoized Supplier<Set<? extends Location>> locations, S3Client sync,
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
@ -225,6 +225,19 @@ public class S3BlobStore extends BaseBlobStore {
return sync.putObject(container, blob2Object.apply(blob)); return sync.putObject(container, blob2Object.apply(blob));
} }
/**
* This implementation invokes {@link S3Client#putObject}
*
* @param container
* bucket name
* @param blob
* object
*/
@Override
public String putBlobMultipart(String container, Blob blob) {
return sync.putObject(container, blob2Object.apply(blob));
}
/** /**
* This implementation invokes {@link S3Client#deleteObject} * This implementation invokes {@link S3Client#deleteObject}
* *

View File

@ -195,6 +195,19 @@ public class SwiftBlobStore extends BaseBlobStore {
return sync.putObject(container, blob2Object.apply(blob)); return sync.putObject(container, blob2Object.apply(blob));
} }
/**
* This implementation invokes {@link CommonSwiftClient#putObject}
*
* @param container
* container name
* @param blob
* object
*/
@Override
public String putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
/** /**
* This implementation invokes {@link CommonSwiftClient#removeObject} * This implementation invokes {@link CommonSwiftClient#removeObject}
* *

View File

@ -200,6 +200,22 @@ public interface BlobStore {
*/ */
String putBlob(String container, Blob blob); String putBlob(String container, Blob blob);
/**
* Adds a {@code Blob} representing the data at location {@code container/blob.metadata.name}
* using multipart strategies.
*
* @param container
* container to place the blob.
* @param blob
* fully qualified name relative to the container.
* @param options
* byte range or condition options
* @return etag of the blob you uploaded, possibly null where etags are unsupported
* @throws ContainerNotFoundException
* if the container doesn't exist
*/
String putBlobMultipart(String container, Blob blob);
/** /**
* Retrieves the metadata of a {@code Blob} at location {@code container/name} * Retrieves the metadata of a {@code Blob} at location {@code container/name}
* *

View File

@ -119,6 +119,11 @@
<artifactId>jsr305</artifactId> <artifactId>jsr305</artifactId>
<version>1.3.9</version> <version>1.3.9</version>
</dependency> </dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.4.Final</version>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>

View File

@ -35,6 +35,7 @@ import javax.annotation.Nullable;
import org.jclouds.crypto.CryptoStreams; import org.jclouds.crypto.CryptoStreams;
import org.jclouds.io.payloads.ByteArrayPayload; import org.jclouds.io.payloads.ByteArrayPayload;
import org.jclouds.io.payloads.ChunkedFilePayload;
import org.jclouds.io.payloads.FilePayload; import org.jclouds.io.payloads.FilePayload;
import org.jclouds.io.payloads.InputStreamPayload; import org.jclouds.io.payloads.InputStreamPayload;
import org.jclouds.io.payloads.StringPayload; import org.jclouds.io.payloads.StringPayload;
@ -84,6 +85,10 @@ public class Payloads {
return new FilePayload(checkNotNull(data, "data")); return new FilePayload(checkNotNull(data, "data"));
} }
public static ChunkedFilePayload newChunkedFilePayload(File data, int part, long chunkOffset, long chunkSize) {
return new ChunkedFilePayload(checkNotNull(data, "data"), part, chunkOffset, chunkSize);
}
public static UrlEncodedFormPayload newUrlEncodedFormPayload(Multimap<String, String> formParams, char... skips) { public static UrlEncodedFormPayload newUrlEncodedFormPayload(Multimap<String, String> formParams, char... skips) {
return new UrlEncodedFormPayload(formParams, skips); return new UrlEncodedFormPayload(formParams, skips);
} }

View File

@ -0,0 +1,126 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.io.payloads;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.stream.ChunkedFile;
/**
*
*
*
*
* @author Tibor Kiss
*/
public class ChunkedFileInputStream extends InputStream {
private static final int CHUNK_SIZE = 8192;
private ChunkedFile chunks;
private ChannelBuffer chunk;
private IOException ex;
public ChunkedFileInputStream(String filename, long offset, long length) {
this(new File(filename), offset, length);
}
public ChunkedFileInputStream(File file, long offset, long length) {
try {
this.chunks = new ChunkedFile(new RandomAccessFile(file, "r"), offset, length, CHUNK_SIZE);
} catch (IOException ex) {
this.ex = ex;
}
}
private ChannelBuffer getChunk() throws Exception {
if (ex != null) {
throw ex;
}
if (chunk == null) {
chunk = ChannelBuffer.class.cast(chunks.nextChunk());
}
if (chunk != null) {
if (chunk.readableBytes() < 1 && chunks.hasNextChunk()) {
chunk = ChannelBuffer.class.cast(chunks.nextChunk());
if (chunk.readableBytes() < 1) {
return null;
}
}
} else {
return null;
}
return chunk;
}
@Override
public int read() throws IOException {
try {
ChannelBuffer chunk = getChunk();
if (chunk == null)
return -1;
if (chunk.readableBytes() < 1)
return -1;
int readIndex = chunk.readerIndex();
byte abyte = chunk.getByte(readIndex);
chunk.readerIndex(readIndex + 1);
return (int)abyte;
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
try {
ChannelBuffer chunk = getChunk();
if (chunk == null)
return -1;
int readable = chunk.readableBytes();
if (readable < 1)
return -1;
if (readable > len) {
readable = len;
}
int readIndex = chunk.readerIndex();
chunk.getBytes(readIndex, b, off, readable);
chunk.readerIndex(readIndex + readable);
return readable;
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public void close() throws IOException {
try {
chunks.close();
} catch (Exception e) {
throw new IOException(e);
}
}
}

View File

@ -0,0 +1,50 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.io.payloads;
import java.io.File;
import java.io.InputStream;
public class ChunkedFilePayload extends FilePayload {
private int part;
private long chunkOffset;
private long chunkSize;
public ChunkedFilePayload(File content) {
this(content, 1, 0, content.length());
}
public ChunkedFilePayload(File content, int part, long chunkOffset, long chunkSize) {
super(content);
this.part = part;
this.chunkOffset = chunkOffset;
this.chunkSize = chunkSize;
}
public int getPart() {
return part;
}
@Override
public InputStream getInput() {
return new ChunkedFileInputStream(getRawContent(), chunkOffset, chunkSize);
}
}

View File

@ -22,8 +22,10 @@ package org.jclouds.aws.s3;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import org.jclouds.aws.s3.blobstore.config.AWSS3BlobStoreContextModule;
import org.jclouds.aws.s3.config.AWSS3RestClientModule; import org.jclouds.aws.s3.config.AWSS3RestClientModule;
import org.jclouds.s3.S3ContextBuilder; import org.jclouds.s3.S3ContextBuilder;
import org.jclouds.s3.blobstore.config.S3BlobStoreContextModule;
import com.google.inject.Module; import com.google.inject.Module;
@ -37,6 +39,11 @@ public class AWSS3ContextBuilder extends S3ContextBuilder {
super(props); super(props);
} }
@Override
protected void addContextModule(List<Module> modules) {
modules.add(new AWSS3BlobStoreContextModule());
}
@Override @Override
protected void addClientModule(List<Module> modules) { protected void addClientModule(List<Module> modules) {
modules.add(new AWSS3RestClientModule()); modules.add(new AWSS3RestClientModule());

View File

@ -0,0 +1,67 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.aws.s3.blobstore;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Provider;
import org.jclouds.Constants;
import org.jclouds.aws.s3.AWSS3AsyncClient;
import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata;
import org.jclouds.blobstore.util.BlobUtils;
import org.jclouds.collect.Memoized;
import org.jclouds.domain.Location;
import org.jclouds.s3.blobstore.S3AsyncBlobStore;
import org.jclouds.s3.blobstore.functions.BlobToObject;
import org.jclouds.s3.blobstore.functions.BucketToResourceList;
import org.jclouds.s3.blobstore.functions.BucketToResourceMetadata;
import org.jclouds.s3.blobstore.functions.ContainerToBucketListOptions;
import org.jclouds.s3.blobstore.functions.ObjectToBlob;
import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata;
import com.google.common.base.Supplier;
/**
*
* @author Tibor Kiss
*/
public class AWSS3AsyncBlobStore extends S3AsyncBlobStore {
@Inject
public AWSS3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, AWSS3AsyncClient async, AWSS3Client sync,
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
super(context, blobUtils, service, defaultLocation, locations, async, sync, bucket2ResourceMd,
container2BucketListOptions, bucket2ResourceList, object2Blob, blob2ObjectGetOptions,
blob2Object, object2BlobMd, fetchBlobMetadataProvider);
}
}

View File

@ -0,0 +1,76 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.aws.s3.blobstore;
import java.util.Set;
import javax.inject.Inject;
import javax.inject.Provider;
import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata;
import org.jclouds.blobstore.util.BlobUtils;
import org.jclouds.collect.Memoized;
import org.jclouds.domain.Location;
import org.jclouds.s3.blobstore.S3BlobStore;
import org.jclouds.s3.blobstore.functions.BlobToObject;
import org.jclouds.s3.blobstore.functions.BucketToResourceList;
import org.jclouds.s3.blobstore.functions.BucketToResourceMetadata;
import org.jclouds.s3.blobstore.functions.ContainerToBucketListOptions;
import org.jclouds.s3.blobstore.functions.ObjectToBlob;
import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata;
import com.google.common.base.Supplier;
/**
* Proived AWS S3 specific extensions.
*
* @author Tibor Kiss
*/
public class AWSS3BlobStore extends S3BlobStore {
private MultipartUploadStrategy multipartUploadStrategy;
@Inject
AWSS3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, AWSS3Client sync,
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
super(context, blobUtils, defaultLocation, locations, sync, bucket2ResourceMd, container2BucketListOptions,
bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object, object2BlobMd,
fetchBlobMetadataProvider);
multipartUploadStrategy = new SequentialMultipartUploadStrategy(this);
}
/* (non-Javadoc)
* @see org.jclouds.blobstore.internal.BaseBlobStore#putBlobMultipart(java.lang.String, org.jclouds.blobstore.domain.Blob)
*/
@Override
public String putBlobMultipart(String container, Blob blob) {
return multipartUploadStrategy.execute(container, blob);
}
}

View File

@ -0,0 +1,59 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.aws.s3.blobstore.config;
import org.jclouds.aws.s3.AWSS3AsyncClient;
import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.AWSS3AsyncBlobStore;
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
import org.jclouds.blobstore.AsyncBlobStore;
import org.jclouds.blobstore.BlobRequestSigner;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.attr.ConsistencyModel;
import org.jclouds.blobstore.config.BlobStoreMapModule;
import org.jclouds.blobstore.internal.BlobStoreContextImpl;
import org.jclouds.location.config.RegionsLocationModule;
import org.jclouds.s3.blobstore.S3BlobRequestSigner;
import org.jclouds.s3.blobstore.config.S3BlobStoreContextModule;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
/**
*
*
* @author Tibor Kiss
*/
public class AWSS3BlobStoreContextModule extends S3BlobStoreContextModule {
@Override
protected void configure() {
install(new BlobStoreMapModule());
install(new RegionsLocationModule());
bind(ConsistencyModel.class).toInstance(ConsistencyModel.EVENTUAL);
bind(AsyncBlobStore.class).to(AWSS3AsyncBlobStore.class).in(Scopes.SINGLETON);
bind(BlobStore.class).to(AWSS3BlobStore.class).in(Scopes.SINGLETON);
bind(BlobStoreContext.class).to(new TypeLiteral<BlobStoreContextImpl<AWSS3Client, AWSS3AsyncClient>>() {
}).in(Scopes.SINGLETON);
bind(BlobRequestSigner.class).to(S3BlobRequestSigner.class);
bindBucketLocationStrategy();
}
}

View File

@ -0,0 +1,50 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.aws.s3.blobstore.strategy;
import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy;
import org.jclouds.blobstore.domain.Blob;
import com.google.inject.ImplementedBy;
/**
* @see <a href="http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?qfacts.html"
*
* @author Tibor Kiss
*/
@ImplementedBy(SequentialMultipartUploadStrategy.class)
public interface MultipartUploadStrategy {
/* Maximum number of parts per upload */
public static final int MAX_NUMBER_OF_PARTS = 10000;
/* Maximum number of parts returned for a list parts request */
public static final int MAX_LIST_PARTS_RETURNED = 1000;
/* Maximum number of multipart uploads returned in a list multipart uploads request */
public static final int MAX_LIST_MPU_RETURNED = 1000;
/*
* part size 5 MB to 5 GB, last part can be < 5 MB
*/
public static final long MIN_PART_SIZE = 5242880L;
public static final long MAX_PART_SIZE = 5368709120L;
String execute(String container, Blob blob);
}

View File

@ -0,0 +1,185 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.aws.s3.blobstore.strategy.internal;
import static org.jclouds.io.Payloads.newChunkedFilePayload;
import java.io.File;
import java.util.Map;
import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.blobstore.KeyNotFoundException;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.io.Payload;
import org.jclouds.io.payloads.FilePayload;
import org.jclouds.s3.domain.ObjectMetadataBuilder;
import com.google.common.collect.Maps;
/**
* Provides a sequential multipart upload strategy.
*
* The file partitioning algorithm:
*
* The default partition size we choose is 32mb.
* A multiple of this default partition size is used.
* The number of parts first grows to a chosen magnitude (for example 100 parts),
* then it grows the partition size instead of number of partitions.
* When we reached the maximum part size, then again it starts to grow the number of partitions.
*
* @author Tibor Kiss
*/
public class SequentialMultipartUploadStrategy implements
MultipartUploadStrategy {
private final long DEFAULT_PART_SIZE = 33554432; // 32mb
private final int MAGNITUDE_BASE = 100;
private final AWSS3BlobStore ablobstore;
// calculated only once, but not from the constructor
private volatile long parts; // required number of parts with chunkSize
private volatile long chunkSize;
private volatile long remaining; // number of bytes remained for the last part
// sequentially updated values
private volatile int part;
private volatile long chunkOffset;
private volatile long copied;
public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore) {
this.ablobstore = ablobstore;
}
protected long calculateChunkSize(long length) {
long unitPartSize = DEFAULT_PART_SIZE; // first try with default part size
long parts = length / unitPartSize;
long partSize = unitPartSize;
int magnitude = (int) (parts / MAGNITUDE_BASE);
if (magnitude > 0) {
partSize = magnitude * unitPartSize;
if (partSize > MAX_PART_SIZE) {
partSize = MAX_PART_SIZE;
unitPartSize = MAX_PART_SIZE;
}
parts = length / partSize;
}
if (parts > MAX_NUMBER_OF_PARTS) { // if splits in too many parts or
// cannot be split
unitPartSize = MIN_PART_SIZE; // take the minimum part size
parts = length / unitPartSize;
}
if (parts > MAX_NUMBER_OF_PARTS) { // if still splits in too many parts
parts = MAX_NUMBER_OF_PARTS - 1; // limit them. do not care about not
// covering
}
long remainder = length % unitPartSize;
if (remainder == 0 && parts > 0) {
parts -= 1;
}
this.chunkSize = partSize;
this.parts = parts;
this.remaining = length - partSize * parts;
System.out.println(" " + length + " bytes partitioned in " + parts
+ " parts of part size: " + chunkSize + ", remaining: "
+ remaining + (remaining > MAX_PART_SIZE ? " overflow!" : ""));
return this.chunkSize;
}
protected long getParts() {
return parts;
}
protected int getNextPart() {
return ++part;
}
protected void addCopied(long copied) {
this.copied += copied;
}
protected long getNextChunkOffset() {
long next = chunkOffset;
chunkOffset += getChunkSize();
return next;
}
protected long getChunkSize() {
return chunkSize;
}
protected long getRemaining() {
return remaining;
}
private String prepareUploadPart(AWSS3Client client, String container, String key, String uploadId,
int part, File file, long chunkOffset, long chunkSize) {
Payload chunkedPart = newChunkedFilePayload(file, part, chunkOffset, chunkSize);
chunkedPart.getContentMetadata().setContentLength(chunkSize);
//chukedPayload.getContentMetadata().setContentMD5(???);
String eTag = null;
try {
eTag = client.uploadPart(container, key, part, uploadId, chunkedPart);
} catch (KeyNotFoundException e) {
// note that because of eventual consistency, the upload id may not be present yet
// we may wish to add this condition to the retry handler
// we may also choose to implement ListParts and wait for the uploadId to become
// available there.
eTag = client.uploadPart(container, key, part, uploadId, chunkedPart);
}
return eTag;
}
@Override
public String execute(String container, Blob blob) {
Payload payload = blob.getPayload();
if (payload instanceof FilePayload) {
String key = blob.getMetadata().getName();
File file = FilePayload.class.cast(payload).getRawContent();
calculateChunkSize(file.length());
long parts = getParts();
if (parts > 0) {
AWSS3Client client = (AWSS3Client) ablobstore.getContext().getProviderSpecificContext().getApi();
String uploadId = client.initiateMultipartUpload(container,
ObjectMetadataBuilder.create().key(key).build()); // TODO md5
Map<Integer, String> etags = Maps.newHashMap();
int part;
while ((part = getNextPart()) <= getParts()) {
String eTag = prepareUploadPart(client, container, key, uploadId, part, file, getNextChunkOffset(), chunkSize);
etags.put(new Integer(part), eTag);
}
long remaining = getRemaining();
if (remaining > 0) {
String eTag = prepareUploadPart(client, container, key, uploadId, part, file, getNextChunkOffset(), remaining);
etags.put(new Integer(part), eTag);
}
return client.completeMultipartUpload(container, key, uploadId, etags);
} else {
return ablobstore.putBlob(container, blob);
}
} else {
return ablobstore.putBlob(container, blob);
}
}
}

View File

@ -27,11 +27,15 @@ import static org.jclouds.io.Payloads.newByteArrayPayload;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.KeyNotFoundException; import org.jclouds.blobstore.KeyNotFoundException;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.http.BaseJettyTest; import org.jclouds.http.BaseJettyTest;
import org.jclouds.http.apachehc.config.ApacheHCHttpCommandExecutorServiceModule; import org.jclouds.http.apachehc.config.ApacheHCHttpCommandExecutorServiceModule;
import org.jclouds.io.Payload; import org.jclouds.io.Payload;
@ -43,6 +47,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.InputSupplier; import com.google.common.io.InputSupplier;
import com.google.inject.Module; import com.google.inject.Module;
@ -134,4 +139,23 @@ public class AWSS3ClientLiveTest extends S3ClientLiveTest {
returnContainer(containerName); returnContainer(containerName);
} }
} }
public void testMultipartChunkedFileStream() throws IOException, InterruptedException {
FileOutputStream fous = new FileOutputStream(new File("target/const.txt"));
ByteStreams.copy(oneHundredOneConstitutions.getInput(), fous);
fous.flush();
fous.close();
String containerName = getContainerName();
try {
BlobStore blobStore = context.getBlobStore();
blobStore.createContainerInLocation(null, containerName);
Blob blob = blobStore.blobBuilder("const.txt")
.payload(new File("target/const.txt")).build();
blobStore.putBlobMultipart(containerName, blob);
} finally {
returnContainer(containerName);
}
}
} }

View File

@ -192,6 +192,19 @@ public class AzureBlobStore extends BaseBlobStore {
return sync.putBlob(container, blob2AzureBlob.apply(blob)); return sync.putBlob(container, blob2AzureBlob.apply(blob));
} }
/**
* This implementation invokes {@link AzureBlobClient#putObject}
*
* @param container
* container name
* @param blob
* object
*/
@Override
public String putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
/** /**
* This implementation invokes {@link AzureBlobClient#deleteObject} * This implementation invokes {@link AzureBlobClient#deleteObject}
* *