mirror of https://github.com/apache/jclouds.git
Merge branch 'large-blob' of https://github.com/jclouds/jclouds into jclouds-large-blob
This commit is contained in:
commit
35d54139c1
|
@ -250,4 +250,9 @@ public class AtmosAsyncBlobStore extends BaseAsyncBlobStore {
|
|||
return async.deletePath(container + "/" + key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
|
||||
return putBlob(container, blob);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -660,4 +660,10 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
|
|||
String eTag = CryptoStreams.hex(object.getPayload().getContentMetadata().getContentMD5());
|
||||
return eTag;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
|
||||
return putBlob(container, blob);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -84,12 +84,12 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore {
|
|||
|
||||
@Inject
|
||||
protected S3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
|
||||
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
|
||||
@Memoized Supplier<Set<? extends Location>> locations, S3AsyncClient async, S3Client sync,
|
||||
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
|
||||
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
|
||||
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
|
||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
|
||||
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
|
||||
@Memoized Supplier<Set<? extends Location>> locations, S3AsyncClient async, S3Client sync,
|
||||
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
|
||||
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
|
||||
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
|
||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
|
||||
super(context, blobUtils, service, defaultLocation, locations);
|
||||
this.blob2ObjectGetOptions = checkNotNull(blob2ObjectGetOptions, "blob2ObjectGetOptions");
|
||||
this.async = checkNotNull(async, "async");
|
||||
|
@ -109,11 +109,11 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore {
|
|||
@Override
|
||||
public ListenableFuture<PageSet<? extends StorageMetadata>> list() {
|
||||
return Futures.compose(async.listOwnedBuckets(),
|
||||
new Function<Set<BucketMetadata>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() {
|
||||
public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(Set<BucketMetadata> from) {
|
||||
return new PageSetImpl<StorageMetadata>(Iterables.transform(from, bucket2ResourceMd), null);
|
||||
}
|
||||
}, service);
|
||||
new Function<Set<BucketMetadata>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() {
|
||||
public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(Set<BucketMetadata> from) {
|
||||
return new PageSetImpl<StorageMetadata>(Iterables.transform(from, bucket2ResourceMd), null);
|
||||
}
|
||||
}, service);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -153,9 +153,9 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore {
|
|||
ListBucketOptions httpOptions = container2BucketListOptions.apply(options);
|
||||
ListenableFuture<ListBucketResponse> returnVal = async.listBucket(container, httpOptions);
|
||||
ListenableFuture<PageSet<? extends StorageMetadata>> list = Futures.compose(returnVal, bucket2ResourceList,
|
||||
service);
|
||||
service);
|
||||
return (options.isDetailed()) ? Futures.compose(list,
|
||||
fetchBlobMetadataProvider.get().setContainerName(container), service) : list;
|
||||
fetchBlobMetadataProvider.get().setContainerName(container), service) : list;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -238,4 +238,9 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore {
|
|||
return async.deleteObject(container, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
|
||||
return putBlob(container, blob);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -55,12 +55,16 @@ public class S3BlobStoreContextModule extends AbstractModule {
|
|||
bind(ConsistencyModel.class).toInstance(ConsistencyModel.EVENTUAL);
|
||||
bind(AsyncBlobStore.class).to(S3AsyncBlobStore.class).in(Scopes.SINGLETON);
|
||||
bind(BlobStore.class).to(S3BlobStore.class).in(Scopes.SINGLETON);
|
||||
bind(BlobStoreContext.class).to(new TypeLiteral<BlobStoreContextImpl<S3Client, S3AsyncClient>>() {
|
||||
}).in(Scopes.SINGLETON);
|
||||
bindContext();
|
||||
bind(BlobRequestSigner.class).to(S3BlobRequestSigner.class);
|
||||
bindBucketLocationStrategy();
|
||||
}
|
||||
|
||||
protected void bindContext() {
|
||||
bind(BlobStoreContext.class).to(new TypeLiteral<BlobStoreContextImpl<S3Client, S3AsyncClient>>() {
|
||||
}).in(Scopes.SINGLETON);
|
||||
}
|
||||
|
||||
protected void bindBucketLocationStrategy() {
|
||||
bind(new TypeLiteral<Function<BucketMetadata, Location>>() {
|
||||
}).to(LocationFromBucketLocation.class);
|
||||
|
|
|
@ -83,13 +83,13 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
|
|||
|
||||
@Inject
|
||||
SwiftAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
|
||||
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
|
||||
@Memoized Supplier<Set<? extends Location>> locations, CommonSwiftClient sync, CommonSwiftAsyncClient async,
|
||||
ContainerToResourceMetadata container2ResourceMd,
|
||||
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
||||
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
|
||||
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
|
||||
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
|
||||
@Memoized Supplier<Set<? extends Location>> locations, CommonSwiftClient sync, CommonSwiftAsyncClient async,
|
||||
ContainerToResourceMetadata container2ResourceMd,
|
||||
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
||||
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
|
||||
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
|
||||
super(context, blobUtils, service, defaultLocation, locations);
|
||||
this.sync = sync;
|
||||
this.async = async;
|
||||
|
@ -109,12 +109,11 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
|
|||
@Override
|
||||
public ListenableFuture<PageSet<? extends StorageMetadata>> list() {
|
||||
return Futures.compose(async.listContainers(),
|
||||
new Function<Set<ContainerMetadata>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() {
|
||||
public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(
|
||||
Set<ContainerMetadata> from) {
|
||||
return new PageSetImpl<StorageMetadata>(Iterables.transform(from, container2ResourceMd), null);
|
||||
}
|
||||
}, service);
|
||||
new Function<Set<ContainerMetadata>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() {
|
||||
public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(Set<ContainerMetadata> from) {
|
||||
return new PageSetImpl<StorageMetadata>(Iterables.transform(from, container2ResourceMd), null);
|
||||
}
|
||||
}, service);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -145,12 +144,12 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
|
|||
@Override
|
||||
public ListenableFuture<PageSet<? extends StorageMetadata>> list(String container, ListContainerOptions options) {
|
||||
org.jclouds.openstack.swift.options.ListContainerOptions httpOptions = container2ContainerListOptions
|
||||
.apply(options);
|
||||
.apply(options);
|
||||
ListenableFuture<PageSet<ObjectInfo>> returnVal = async.listObjects(container, httpOptions);
|
||||
ListenableFuture<PageSet<? extends StorageMetadata>> list = Futures.compose(returnVal, container2ResourceList,
|
||||
service);
|
||||
service);
|
||||
return options.isDetailed() ? Futures.compose(list, fetchBlobMetadataProvider.get().setContainerName(container),
|
||||
service) : list;
|
||||
service) : list;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -177,14 +176,14 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
|
|||
@Override
|
||||
public ListenableFuture<BlobMetadata> blobMetadata(String container, String key) {
|
||||
return Futures.compose(async.getObjectInfo(container, key),
|
||||
new Function<MutableObjectInfoWithMetadata, BlobMetadata>() {
|
||||
new Function<MutableObjectInfoWithMetadata, BlobMetadata>() {
|
||||
|
||||
@Override
|
||||
public BlobMetadata apply(MutableObjectInfoWithMetadata from) {
|
||||
return object2BlobMd.apply(from);
|
||||
}
|
||||
@Override
|
||||
public BlobMetadata apply(MutableObjectInfoWithMetadata from) {
|
||||
return object2BlobMd.apply(from);
|
||||
}
|
||||
|
||||
}, service);
|
||||
}, service);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -235,4 +234,9 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
|
|||
return !sync.containerExists(container);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
|
||||
return putBlob(container, blob);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.jclouds.blobstore.options.GetOptions;
|
|||
import org.jclouds.blobstore.options.ListContainerOptions;
|
||||
import org.jclouds.domain.Location;
|
||||
|
||||
import com.google.common.annotations.Beta;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
||||
/**
|
||||
|
@ -126,6 +127,12 @@ public interface AsyncBlobStore {
|
|||
*/
|
||||
ListenableFuture<String> putBlob(String container, Blob blob);
|
||||
|
||||
/**
|
||||
* @see BlobStore#putBlobMultipart
|
||||
*/
|
||||
@Beta
|
||||
ListenableFuture<String> putBlobMultipart(String container, Blob blob);
|
||||
|
||||
/**
|
||||
* @see BlobStore#blobMetadata
|
||||
*/
|
||||
|
|
|
@ -32,6 +32,8 @@ import org.jclouds.blobstore.options.GetOptions;
|
|||
import org.jclouds.blobstore.options.ListContainerOptions;
|
||||
import org.jclouds.domain.Location;
|
||||
|
||||
import com.google.common.annotations.Beta;
|
||||
|
||||
/**
|
||||
* Synchronous access to a BlobStore such as Amazon S3
|
||||
*
|
||||
|
@ -48,17 +50,18 @@ public interface BlobStore {
|
|||
|
||||
/**
|
||||
* creates a new blob with the specified name.
|
||||
*
|
||||
* @see #blobBuilder
|
||||
*/
|
||||
@Deprecated
|
||||
Blob newBlob(String name);
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* @return builder for creating new {@link Blob}s
|
||||
*/
|
||||
BlobBuilder blobBuilder(String name);
|
||||
|
||||
|
||||
/**
|
||||
* The get locations command returns all the valid locations for containers. A location has a
|
||||
* scope, which is typically region or zone. A region is a general area, like eu-west, where a
|
||||
|
@ -199,7 +202,7 @@ public interface BlobStore {
|
|||
* if the container doesn't exist
|
||||
*/
|
||||
String putBlob(String container, Blob blob);
|
||||
|
||||
|
||||
/**
|
||||
* Adds a {@code Blob} representing the data at location {@code container/blob.metadata.name}
|
||||
* using multipart strategies.
|
||||
|
@ -214,6 +217,7 @@ public interface BlobStore {
|
|||
* @throws ContainerNotFoundException
|
||||
* if the container doesn't exist
|
||||
*/
|
||||
@Beta
|
||||
String putBlobMultipart(String container, Blob blob);
|
||||
|
||||
/**
|
||||
|
|
|
@ -675,4 +675,10 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
|
|||
return containerToLocation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
|
||||
// TODO implement
|
||||
return putBlob(container, blob);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -119,11 +119,6 @@
|
|||
<artifactId>jsr305</artifactId>
|
||||
<version>1.3.9</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jboss.netty</groupId>
|
||||
<artifactId>netty</artifactId>
|
||||
<version>3.2.4.Final</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<profiles>
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
*
|
||||
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
|
||||
*
|
||||
* ====================================================================
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* ====================================================================
|
||||
*/
|
||||
|
||||
package org.jclouds.io;
|
||||
|
||||
import org.jclouds.io.internal.BasePayloadSlicer;
|
||||
|
||||
import com.google.inject.ImplementedBy;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
@ImplementedBy(BasePayloadSlicer.class)
|
||||
public interface PayloadSlicer {
|
||||
/**
|
||||
* Returns a {@link Payload} that returns input streams from the an underlying payload, where
|
||||
* each stream starts at the given offset and is limited to the specified number of bytes.
|
||||
*
|
||||
* @param input
|
||||
* the payload from which to get the raw streams
|
||||
* @param offset
|
||||
* the offset in bytes into the underlying stream where the returned streams will start
|
||||
* @param length
|
||||
* the maximum length of the returned streams
|
||||
* @throws IllegalArgumentException
|
||||
* if offset or length are negative
|
||||
*/
|
||||
Payload slice(Payload input, long offset, long length);
|
||||
}
|
|
@ -35,7 +35,6 @@ import javax.annotation.Nullable;
|
|||
|
||||
import org.jclouds.crypto.CryptoStreams;
|
||||
import org.jclouds.io.payloads.ByteArrayPayload;
|
||||
import org.jclouds.io.payloads.ChunkedFilePayload;
|
||||
import org.jclouds.io.payloads.FilePayload;
|
||||
import org.jclouds.io.payloads.InputStreamPayload;
|
||||
import org.jclouds.io.payloads.StringPayload;
|
||||
|
@ -84,10 +83,6 @@ public class Payloads {
|
|||
public static FilePayload newFilePayload(File data) {
|
||||
return new FilePayload(checkNotNull(data, "data"));
|
||||
}
|
||||
|
||||
public static ChunkedFilePayload newChunkedFilePayload(File data, int part, long chunkOffset, long chunkSize) {
|
||||
return new ChunkedFilePayload(checkNotNull(data, "data"), part, chunkOffset, chunkSize);
|
||||
}
|
||||
|
||||
public static UrlEncodedFormPayload newUrlEncodedFormPayload(Multimap<String, String> formParams, char... skips) {
|
||||
return new UrlEncodedFormPayload(formParams, skips);
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
/**
|
||||
*
|
||||
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
|
||||
*
|
||||
* ====================================================================
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* ====================================================================
|
||||
*/
|
||||
|
||||
package org.jclouds.io.internal;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.InputStream;
|
||||
|
||||
import javax.inject.Singleton;
|
||||
|
||||
import org.jclouds.io.InputSuppliers;
|
||||
import org.jclouds.io.Payload;
|
||||
import org.jclouds.io.PayloadSlicer;
|
||||
import org.jclouds.io.payloads.BaseMutableContentMetadata;
|
||||
import org.jclouds.io.payloads.InputStreamSupplierPayload;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.io.ByteStreams;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
@Singleton
|
||||
public class BasePayloadSlicer implements PayloadSlicer {
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Payload slice(Payload input, long offset, long length) {
|
||||
checkNotNull(input);
|
||||
checkArgument(offset >= 0, "offset is negative");
|
||||
checkArgument(length >= 0, "length is negative");
|
||||
Payload returnVal;
|
||||
if (input.getRawContent() instanceof File) {
|
||||
returnVal = doSlice((File) input.getRawContent(), offset, length);
|
||||
} else if (input.getRawContent() instanceof String) {
|
||||
returnVal = doSlice((byte[]) input.getRawContent(), offset, length);
|
||||
} else if (input.getRawContent() instanceof byte[]) {
|
||||
returnVal = doSlice((byte[]) input.getRawContent(), offset, length);
|
||||
} else {
|
||||
returnVal = doSlice(input.getInput(), offset, length);
|
||||
}
|
||||
return copyMetadataAndSetLength(input, returnVal, length);
|
||||
}
|
||||
|
||||
protected Payload doSlice(Payload content, long offset, long length) {
|
||||
return new InputStreamSupplierPayload(ByteStreams.slice(content, offset, length));
|
||||
}
|
||||
|
||||
protected Payload doSlice(String content, long offset, long length) {
|
||||
return doSlice(content.getBytes(), offset, length);
|
||||
}
|
||||
|
||||
protected Payload doSlice(File content, long offset, long length) {
|
||||
try {
|
||||
return doSlice(new FileInputStream(content), offset, length);
|
||||
} catch (FileNotFoundException e) {
|
||||
Throwables.propagate(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
protected Payload doSlice(InputStream content, long offset, long length) {
|
||||
return new InputStreamSupplierPayload(ByteStreams.slice(InputSuppliers.of(content), offset, length));
|
||||
}
|
||||
|
||||
protected Payload doSlice(byte[] content, long offset, long length) {
|
||||
Payload returnVal;
|
||||
checkArgument(offset <= Integer.MAX_VALUE, "offset is too big for an array");
|
||||
checkArgument(length <= Integer.MAX_VALUE, "length is too big for an array");
|
||||
returnVal = new InputStreamSupplierPayload(
|
||||
ByteStreams.newInputStreamSupplier(content, (int) offset, (int) length));
|
||||
return returnVal;
|
||||
}
|
||||
|
||||
protected Payload copyMetadataAndSetLength(Payload input, Payload returnVal, long length) {
|
||||
returnVal.setContentMetadata(BaseMutableContentMetadata.fromContentMetadata(input.getContentMetadata()
|
||||
.toBuilder().contentLength(length).contentMD5(null).build()));
|
||||
return returnVal;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
/**
|
||||
*
|
||||
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
|
||||
*
|
||||
* ====================================================================
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* ====================================================================
|
||||
*/
|
||||
|
||||
package org.jclouds.io.payloads;
|
||||
|
||||
import static com.google.common.io.Closeables.closeQuietly;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.List;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.io.InputSupplier;
|
||||
|
||||
/**
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
public class InputStreamSupplierPayload extends BasePayload<InputSupplier<? extends InputStream>> {
|
||||
private List<InputStream> toClose = Lists.newArrayList();
|
||||
|
||||
public InputStreamSupplierPayload(InputSupplier<? extends InputStream> content) {
|
||||
super(content);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public InputStream getInput() {
|
||||
try {
|
||||
InputStream returnVal = content.getInput();
|
||||
toClose.add(returnVal);
|
||||
return returnVal;
|
||||
} catch (IOException e) {
|
||||
Throwables.propagate(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public boolean isRepeatable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* if we created the stream, then it is already consumed on close.
|
||||
*/
|
||||
@Override
|
||||
public void release() {
|
||||
if (toClose.size() > 0)
|
||||
for (InputStream content = toClose.remove(0); toClose.size() > 0; content = toClose.remove(0))
|
||||
closeQuietly(content);
|
||||
}
|
||||
|
||||
}
|
Binary file not shown.
|
@ -0,0 +1,82 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
|
||||
|
||||
Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
|
||||
|
||||
====================================================================
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
====================================================================
|
||||
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.jclouds</groupId>
|
||||
<artifactId>jclouds-project</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
<relativePath>../../project/pom.xml</relativePath>
|
||||
</parent>
|
||||
<groupId>org.jclouds.driver</groupId>
|
||||
<artifactId>jclouds-netty</artifactId>
|
||||
<name>jclouds netty payload module</name>
|
||||
<description>jclouds netty payload module</description>
|
||||
|
||||
<!-- bootstrapping: need to fetch the project POM -->
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>jclouds-sona-snapshots-nexus</id>
|
||||
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
|
||||
<releases>
|
||||
<enabled>false</enabled>
|
||||
</releases>
|
||||
<snapshots>
|
||||
<enabled>true</enabled>
|
||||
</snapshots>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>jboss-public-releases</id>
|
||||
<url>https://repository.jboss.org/nexus/content/groups/public-jboss</url>
|
||||
<releases>
|
||||
<enabled>true</enabled>
|
||||
</releases>
|
||||
<snapshots>
|
||||
<enabled>false</enabled>
|
||||
</snapshots>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.jclouds</groupId>
|
||||
<artifactId>jclouds-core</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jclouds</groupId>
|
||||
<artifactId>jclouds-core</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jboss.netty</groupId>
|
||||
<artifactId>netty</artifactId>
|
||||
<version>3.2.4.Final</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
package org.jclouds.netty.config;
|
||||
|
||||
import org.jclouds.io.PayloadSlicer;
|
||||
import org.jclouds.netty.io.NettyPayloadSlicer;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Adrian Cole
|
||||
*
|
||||
*/
|
||||
public class NettyPayloadModule extends AbstractModule {
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(PayloadSlicer.class).to(NettyPayloadSlicer.class);
|
||||
}
|
||||
|
||||
}
|
|
@ -17,10 +17,9 @@
|
|||
* ====================================================================
|
||||
*/
|
||||
|
||||
package org.jclouds.io.payloads;
|
||||
package org.jclouds.netty.io;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.RandomAccessFile;
|
||||
|
@ -30,24 +29,24 @@ import org.jboss.netty.handler.stream.ChunkedFile;
|
|||
|
||||
/**
|
||||
*
|
||||
*
|
||||
*
|
||||
*
|
||||
*
|
||||
*
|
||||
*
|
||||
* @author Tibor Kiss
|
||||
*/
|
||||
public class ChunkedFileInputStream extends InputStream {
|
||||
|
||||
|
||||
private static final int CHUNK_SIZE = 8192;
|
||||
|
||||
|
||||
private ChunkedFile chunks;
|
||||
private ChannelBuffer chunk;
|
||||
|
||||
|
||||
private IOException ex;
|
||||
|
||||
public ChunkedFileInputStream(String filename, long offset, long length) {
|
||||
this(new File(filename), offset, length);
|
||||
}
|
||||
|
||||
|
||||
public ChunkedFileInputStream(File file, long offset, long length) {
|
||||
try {
|
||||
this.chunks = new ChunkedFile(new RandomAccessFile(file, "r"), offset, length, CHUNK_SIZE);
|
||||
|
@ -62,9 +61,9 @@ public class ChunkedFileInputStream extends InputStream {
|
|||
}
|
||||
if (chunk == null) {
|
||||
chunk = ChannelBuffer.class.cast(chunks.nextChunk());
|
||||
}
|
||||
}
|
||||
if (chunk != null) {
|
||||
if (chunk.readableBytes() < 1 && chunks.hasNextChunk()) {
|
||||
if (chunk.readableBytes() < 1 && chunks.hasNextChunk()) {
|
||||
chunk = ChannelBuffer.class.cast(chunks.nextChunk());
|
||||
if (chunk.readableBytes() < 1) {
|
||||
return null;
|
||||
|
@ -75,7 +74,7 @@ public class ChunkedFileInputStream extends InputStream {
|
|||
}
|
||||
return chunk;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
try {
|
||||
|
@ -87,15 +86,15 @@ public class ChunkedFileInputStream extends InputStream {
|
|||
int readIndex = chunk.readerIndex();
|
||||
byte abyte = chunk.getByte(readIndex);
|
||||
chunk.readerIndex(readIndex + 1);
|
||||
return (int)abyte;
|
||||
return (int) abyte;
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
try {
|
||||
try {
|
||||
ChannelBuffer chunk = getChunk();
|
||||
if (chunk == null)
|
||||
return -1;
|
||||
|
@ -111,7 +110,7 @@ public class ChunkedFileInputStream extends InputStream {
|
|||
return readable;
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
|
@ -17,34 +17,26 @@
|
|||
* ====================================================================
|
||||
*/
|
||||
|
||||
package org.jclouds.io.payloads;
|
||||
package org.jclouds.netty.io;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.InputStream;
|
||||
|
||||
public class ChunkedFilePayload extends FilePayload {
|
||||
import javax.inject.Singleton;
|
||||
|
||||
private int part;
|
||||
private long chunkOffset;
|
||||
private long chunkSize;
|
||||
|
||||
public ChunkedFilePayload(File content) {
|
||||
this(content, 1, 0, content.length());
|
||||
}
|
||||
|
||||
public ChunkedFilePayload(File content, int part, long chunkOffset, long chunkSize) {
|
||||
super(content);
|
||||
this.part = part;
|
||||
this.chunkOffset = chunkOffset;
|
||||
this.chunkSize = chunkSize;
|
||||
}
|
||||
|
||||
public int getPart() {
|
||||
return part;
|
||||
}
|
||||
import org.jclouds.io.Payload;
|
||||
import org.jclouds.io.Payloads;
|
||||
import org.jclouds.io.internal.BasePayloadSlicer;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
@Singleton
|
||||
public class NettyPayloadSlicer extends BasePayloadSlicer {
|
||||
|
||||
@Override
|
||||
public InputStream getInput() {
|
||||
return new ChunkedFileInputStream(getRawContent(), chunkOffset, chunkSize);
|
||||
protected Payload doSlice(File content, long offset, long length) {
|
||||
return Payloads.newInputStreamPayload(new ChunkedFileInputStream(content, offset, length));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -38,6 +38,7 @@
|
|||
<module>bouncycastle</module>
|
||||
<module>log4j</module>
|
||||
<module>jsch</module>
|
||||
<module>netty</module>
|
||||
<module>enterprise</module>
|
||||
</modules>
|
||||
</project>
|
||||
|
|
|
@ -42,6 +42,21 @@
|
|||
<test.aws-s3.credential>${test.aws.credential}</test.aws-s3.credential>
|
||||
</properties>
|
||||
|
||||
<!-- temporary -->
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>jboss-public-releases</id>
|
||||
<url>https://repository.jboss.org/nexus/content/groups/public-jboss</url>
|
||||
<releases>
|
||||
<enabled>true</enabled>
|
||||
</releases>
|
||||
<snapshots>
|
||||
<enabled>false</enabled>
|
||||
</snapshots>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.jclouds.api</groupId>
|
||||
|
@ -94,6 +109,18 @@
|
|||
<version>1.2.16</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jclouds.driver</groupId>
|
||||
<artifactId>jclouds-netty</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jboss.netty</groupId>
|
||||
<artifactId>netty</artifactId>
|
||||
<version>3.2.4.Final</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<profiles>
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.util.Properties;
|
|||
import org.jclouds.aws.s3.blobstore.config.AWSS3BlobStoreContextModule;
|
||||
import org.jclouds.aws.s3.config.AWSS3RestClientModule;
|
||||
import org.jclouds.s3.S3ContextBuilder;
|
||||
import org.jclouds.s3.blobstore.config.S3BlobStoreContextModule;
|
||||
|
||||
import com.google.inject.Module;
|
||||
|
||||
|
|
|
@ -29,7 +29,9 @@ import javax.inject.Provider;
|
|||
import org.jclouds.Constants;
|
||||
import org.jclouds.aws.s3.AWSS3AsyncClient;
|
||||
import org.jclouds.aws.s3.AWSS3Client;
|
||||
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
|
||||
import org.jclouds.blobstore.BlobStoreContext;
|
||||
import org.jclouds.blobstore.domain.Blob;
|
||||
import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
|
||||
import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata;
|
||||
import org.jclouds.blobstore.util.BlobUtils;
|
||||
|
@ -44,13 +46,17 @@ import org.jclouds.s3.blobstore.functions.ObjectToBlob;
|
|||
import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @author Tibor Kiss
|
||||
*/
|
||||
public class AWSS3AsyncBlobStore extends S3AsyncBlobStore {
|
||||
|
||||
private final Provider<MultipartUploadStrategy> multipartUploadStrategy;
|
||||
|
||||
@Inject
|
||||
public AWSS3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
|
||||
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
|
||||
|
@ -58,10 +64,19 @@ public class AWSS3AsyncBlobStore extends S3AsyncBlobStore {
|
|||
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
|
||||
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
|
||||
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
|
||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
|
||||
super(context, blobUtils, service, defaultLocation, locations, async, sync, bucket2ResourceMd,
|
||||
container2BucketListOptions, bucket2ResourceList, object2Blob, blob2ObjectGetOptions,
|
||||
blob2Object, object2BlobMd, fetchBlobMetadataProvider);
|
||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider,
|
||||
Provider<MultipartUploadStrategy> multipartUploadStrategy) {
|
||||
super(context, blobUtils, service, defaultLocation, locations, async, sync, bucket2ResourceMd,
|
||||
container2BucketListOptions, bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object,
|
||||
object2BlobMd, fetchBlobMetadataProvider);
|
||||
this.multipartUploadStrategy = multipartUploadStrategy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
|
||||
// TODO: make this better
|
||||
// need to use a provider if the strategy object is stateful
|
||||
return Futures.immediateFuture(multipartUploadStrategy.get().execute(container, blob));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -26,7 +26,6 @@ import javax.inject.Provider;
|
|||
|
||||
import org.jclouds.aws.s3.AWSS3Client;
|
||||
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
|
||||
import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy;
|
||||
import org.jclouds.blobstore.BlobStoreContext;
|
||||
import org.jclouds.blobstore.domain.Blob;
|
||||
import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
|
||||
|
@ -45,32 +44,30 @@ import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata;
|
|||
import com.google.common.base.Supplier;
|
||||
|
||||
/**
|
||||
* Proived AWS S3 specific extensions.
|
||||
*
|
||||
* Proived AWS S3 specific extensions.
|
||||
*
|
||||
* @author Tibor Kiss
|
||||
*/
|
||||
public class AWSS3BlobStore extends S3BlobStore {
|
||||
|
||||
private MultipartUploadStrategy multipartUploadStrategy;
|
||||
|
||||
private final Provider<MultipartUploadStrategy> multipartUploadStrategy;
|
||||
|
||||
@Inject
|
||||
AWSS3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
|
||||
@Memoized Supplier<Set<? extends Location>> locations, AWSS3Client sync,
|
||||
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
|
||||
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
|
||||
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
|
||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
|
||||
@Memoized Supplier<Set<? extends Location>> locations, AWSS3Client sync,
|
||||
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
|
||||
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
|
||||
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
|
||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider, Provider<MultipartUploadStrategy> multipartUploadStrategy) {
|
||||
super(context, blobUtils, defaultLocation, locations, sync, bucket2ResourceMd, container2BucketListOptions,
|
||||
bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object, object2BlobMd,
|
||||
fetchBlobMetadataProvider);
|
||||
multipartUploadStrategy = new SequentialMultipartUploadStrategy(this);
|
||||
this.multipartUploadStrategy = multipartUploadStrategy;
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.jclouds.blobstore.internal.BaseBlobStore#putBlobMultipart(java.lang.String, org.jclouds.blobstore.domain.Blob)
|
||||
*/
|
||||
@Override
|
||||
public String putBlobMultipart(String container, Blob blob) {
|
||||
return multipartUploadStrategy.execute(container, blob);
|
||||
// need to use a provider if the strategy object is stateful
|
||||
return multipartUploadStrategy.get().execute(container, blob);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,15 +23,12 @@ import org.jclouds.aws.s3.AWSS3AsyncClient;
|
|||
import org.jclouds.aws.s3.AWSS3Client;
|
||||
import org.jclouds.aws.s3.blobstore.AWSS3AsyncBlobStore;
|
||||
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
|
||||
import org.jclouds.blobstore.AsyncBlobStore;
|
||||
import org.jclouds.blobstore.BlobRequestSigner;
|
||||
import org.jclouds.blobstore.BlobStore;
|
||||
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
|
||||
import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy;
|
||||
import org.jclouds.blobstore.BlobStoreContext;
|
||||
import org.jclouds.blobstore.attr.ConsistencyModel;
|
||||
import org.jclouds.blobstore.config.BlobStoreMapModule;
|
||||
import org.jclouds.blobstore.internal.BlobStoreContextImpl;
|
||||
import org.jclouds.location.config.RegionsLocationModule;
|
||||
import org.jclouds.s3.blobstore.S3BlobRequestSigner;
|
||||
import org.jclouds.s3.blobstore.S3AsyncBlobStore;
|
||||
import org.jclouds.s3.blobstore.S3BlobStore;
|
||||
import org.jclouds.s3.blobstore.config.S3BlobStoreContextModule;
|
||||
|
||||
import com.google.inject.Scopes;
|
||||
|
@ -39,21 +36,23 @@ import com.google.inject.TypeLiteral;
|
|||
|
||||
/**
|
||||
*
|
||||
*
|
||||
*
|
||||
* @author Tibor Kiss
|
||||
*/
|
||||
public class AWSS3BlobStoreContextModule extends S3BlobStoreContextModule {
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
install(new BlobStoreMapModule());
|
||||
install(new RegionsLocationModule());
|
||||
bind(ConsistencyModel.class).toInstance(ConsistencyModel.EVENTUAL);
|
||||
bind(AsyncBlobStore.class).to(AWSS3AsyncBlobStore.class).in(Scopes.SINGLETON);
|
||||
bind(BlobStore.class).to(AWSS3BlobStore.class).in(Scopes.SINGLETON);
|
||||
super.configure();
|
||||
bind(S3AsyncBlobStore.class).to(AWSS3AsyncBlobStore.class).in(Scopes.SINGLETON);
|
||||
bind(S3BlobStore.class).to(AWSS3BlobStore.class).in(Scopes.SINGLETON);
|
||||
bind(MultipartUploadStrategy.class).to(SequentialMultipartUploadStrategy.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void bindContext() {
|
||||
bind(BlobStoreContext.class).to(new TypeLiteral<BlobStoreContextImpl<AWSS3Client, AWSS3AsyncClient>>() {
|
||||
}).in(Scopes.SINGLETON);
|
||||
bind(BlobRequestSigner.class).to(S3BlobRequestSigner.class);
|
||||
bindBucketLocationStrategy();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,19 +19,23 @@
|
|||
|
||||
package org.jclouds.aws.s3.blobstore.strategy.internal;
|
||||
|
||||
import static org.jclouds.io.Payloads.newChunkedFilePayload;
|
||||
|
||||
import java.io.File;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Named;
|
||||
|
||||
import org.jclouds.aws.s3.AWSS3Client;
|
||||
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
|
||||
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
|
||||
import org.jclouds.blobstore.KeyNotFoundException;
|
||||
import org.jclouds.blobstore.domain.Blob;
|
||||
import org.jclouds.blobstore.reference.BlobStoreConstants;
|
||||
import org.jclouds.io.Payload;
|
||||
import org.jclouds.io.payloads.FilePayload;
|
||||
import org.jclouds.io.PayloadSlicer;
|
||||
import org.jclouds.logging.Logger;
|
||||
import org.jclouds.s3.domain.ObjectMetadataBuilder;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
@ -40,35 +44,39 @@ import com.google.common.collect.Maps;
|
|||
* Provides a sequential multipart upload strategy.
|
||||
*
|
||||
* The file partitioning algorithm:
|
||||
*
|
||||
* The default partition size we choose is 32mb.
|
||||
* A multiple of this default partition size is used.
|
||||
* The number of parts first grows to a chosen magnitude (for example 100 parts),
|
||||
* then it grows the partition size instead of number of partitions.
|
||||
* When we reached the maximum part size, then again it starts to grow the number of partitions.
|
||||
*
|
||||
*
|
||||
* The default partition size we choose is 32mb. A multiple of this default partition size is used.
|
||||
* The number of parts first grows to a chosen magnitude (for example 100 parts), then it grows the
|
||||
* partition size instead of number of partitions. When we reached the maximum part size, then again
|
||||
* it starts to grow the number of partitions.
|
||||
*
|
||||
* @author Tibor Kiss
|
||||
*/
|
||||
public class SequentialMultipartUploadStrategy implements
|
||||
MultipartUploadStrategy {
|
||||
public class SequentialMultipartUploadStrategy implements MultipartUploadStrategy {
|
||||
@Resource
|
||||
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
||||
protected Logger logger = Logger.NULL;
|
||||
|
||||
private final long DEFAULT_PART_SIZE = 33554432; // 32mb
|
||||
private final long DEFAULT_PART_SIZE = 33554432; // 32mb
|
||||
private final int MAGNITUDE_BASE = 100;
|
||||
|
||||
|
||||
private final AWSS3BlobStore ablobstore;
|
||||
private final PayloadSlicer slicer;
|
||||
|
||||
// calculated only once, but not from the constructor
|
||||
private volatile long parts; // required number of parts with chunkSize
|
||||
private volatile long chunkSize;
|
||||
private volatile long remaining; // number of bytes remained for the last part
|
||||
|
||||
|
||||
// sequentially updated values
|
||||
private volatile int part;
|
||||
private volatile long chunkOffset;
|
||||
private volatile long copied;
|
||||
|
||||
public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore) {
|
||||
this.ablobstore = ablobstore;
|
||||
|
||||
@Inject
|
||||
public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore, PayloadSlicer slicer) {
|
||||
this.ablobstore = checkNotNull(ablobstore, "ablobstore");
|
||||
this.slicer = checkNotNull(slicer, "slicer");
|
||||
}
|
||||
|
||||
protected long calculateChunkSize(long length) {
|
||||
|
@ -100,43 +108,39 @@ public class SequentialMultipartUploadStrategy implements
|
|||
this.chunkSize = partSize;
|
||||
this.parts = parts;
|
||||
this.remaining = length - partSize * parts;
|
||||
System.out.println(" " + length + " bytes partitioned in " + parts
|
||||
+ " parts of part size: " + chunkSize + ", remaining: "
|
||||
+ remaining + (remaining > MAX_PART_SIZE ? " overflow!" : ""));
|
||||
logger.debug(" %d bytes partitioned in %d parts of part size: %d, remaining: %d%s", length, parts, chunkSize,
|
||||
remaining, (remaining > MAX_PART_SIZE ? " overflow!" : ""));
|
||||
return this.chunkSize;
|
||||
}
|
||||
|
||||
|
||||
protected long getParts() {
|
||||
return parts;
|
||||
}
|
||||
|
||||
|
||||
protected int getNextPart() {
|
||||
return ++part;
|
||||
}
|
||||
|
||||
|
||||
protected void addCopied(long copied) {
|
||||
this.copied += copied;
|
||||
}
|
||||
|
||||
|
||||
protected long getNextChunkOffset() {
|
||||
long next = chunkOffset;
|
||||
chunkOffset += getChunkSize();
|
||||
return next;
|
||||
}
|
||||
|
||||
|
||||
protected long getChunkSize() {
|
||||
return chunkSize;
|
||||
}
|
||||
|
||||
|
||||
protected long getRemaining() {
|
||||
return remaining;
|
||||
}
|
||||
|
||||
private String prepareUploadPart(AWSS3Client client, String container, String key, String uploadId,
|
||||
int part, File file, long chunkOffset, long chunkSize) {
|
||||
Payload chunkedPart = newChunkedFilePayload(file, part, chunkOffset, chunkSize);
|
||||
chunkedPart.getContentMetadata().setContentLength(chunkSize);
|
||||
//chukedPayload.getContentMetadata().setContentMD5(???);
|
||||
|
||||
private String prepareUploadPart(AWSS3Client client, String container, String key, String uploadId, int part,
|
||||
Payload chunkedPart) {
|
||||
String eTag = null;
|
||||
try {
|
||||
eTag = client.uploadPart(container, key, part, uploadId, chunkedPart);
|
||||
|
@ -153,31 +157,27 @@ public class SequentialMultipartUploadStrategy implements
|
|||
|
||||
@Override
|
||||
public String execute(String container, Blob blob) {
|
||||
Payload payload = blob.getPayload();
|
||||
if (payload instanceof FilePayload) {
|
||||
String key = blob.getMetadata().getName();
|
||||
File file = FilePayload.class.cast(payload).getRawContent();
|
||||
calculateChunkSize(file.length());
|
||||
long parts = getParts();
|
||||
if (parts > 0) {
|
||||
AWSS3Client client = (AWSS3Client) ablobstore.getContext().getProviderSpecificContext().getApi();
|
||||
String uploadId = client.initiateMultipartUpload(container,
|
||||
ObjectMetadataBuilder.create().key(key).build()); // TODO md5
|
||||
Map<Integer, String> etags = Maps.newHashMap();
|
||||
int part;
|
||||
while ((part = getNextPart()) <= getParts()) {
|
||||
String eTag = prepareUploadPart(client, container, key, uploadId, part, file, getNextChunkOffset(), chunkSize);
|
||||
etags.put(new Integer(part), eTag);
|
||||
}
|
||||
long remaining = getRemaining();
|
||||
if (remaining > 0) {
|
||||
String eTag = prepareUploadPart(client, container, key, uploadId, part, file, getNextChunkOffset(), remaining);
|
||||
etags.put(new Integer(part), eTag);
|
||||
}
|
||||
return client.completeMultipartUpload(container, key, uploadId, etags);
|
||||
} else {
|
||||
return ablobstore.putBlob(container, blob);
|
||||
String key = blob.getMetadata().getName();
|
||||
calculateChunkSize(blob.getPayload().getContentMetadata().getContentLength());
|
||||
long parts = getParts();
|
||||
if (parts > 0) {
|
||||
AWSS3Client client = (AWSS3Client) ablobstore.getContext().getProviderSpecificContext().getApi();
|
||||
String uploadId = client.initiateMultipartUpload(container, ObjectMetadataBuilder.create().key(key).build()); // TODO
|
||||
// md5
|
||||
Map<Integer, String> etags = Maps.newHashMap();
|
||||
int part;
|
||||
while ((part = getNextPart()) <= getParts()) {
|
||||
String eTag = prepareUploadPart(client, container, key, uploadId, part,
|
||||
slicer.slice(blob.getPayload(), getNextChunkOffset(), chunkSize));
|
||||
etags.put(new Integer(part), eTag);
|
||||
}
|
||||
long remaining = getRemaining();
|
||||
if (remaining > 0) {
|
||||
String eTag = prepareUploadPart(client, container, key, uploadId, part,
|
||||
slicer.slice(blob.getPayload(), getNextChunkOffset(), remaining));
|
||||
etags.put(new Integer(part), eTag);
|
||||
}
|
||||
return client.completeMultipartUpload(container, key, uploadId, etags);
|
||||
} else {
|
||||
return ablobstore.putBlob(container, blob);
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.jclouds.blobstore.BlobStoreContextFactory;
|
|||
import org.jclouds.blobstore.integration.TransientBlobStoreTestInitializer;
|
||||
import org.jclouds.blobstore.integration.internal.BaseBlobStoreIntegrationTest;
|
||||
import org.jclouds.logging.log4j.config.Log4JLoggingModule;
|
||||
import org.jclouds.netty.config.NettyPayloadModule;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.inject.Module;
|
||||
|
@ -45,7 +46,7 @@ public class AWSS3TestInitializer extends TransientBlobStoreTestInitializer {
|
|||
protected BlobStoreContext createLiveContext(Module configurationModule, String endpoint, String apiversion,
|
||||
String app, String identity, String credential) throws IOException {
|
||||
return new BlobStoreContextFactory().createContext(provider, ImmutableSet.of(configurationModule,
|
||||
new Log4JLoggingModule()), setupProperties(endpoint, apiversion, identity, credential));
|
||||
new Log4JLoggingModule(), new NettyPayloadModule()), setupProperties(endpoint, apiversion, identity, credential));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import javax.inject.Named;
|
|||
import javax.inject.Singleton;
|
||||
|
||||
import org.jclouds.Constants;
|
||||
import org.jclouds.azure.storage.domain.BoundedSet;
|
||||
import org.jclouds.azureblob.AzureBlobAsyncClient;
|
||||
import org.jclouds.azureblob.blobstore.functions.AzureBlobToBlob;
|
||||
import org.jclouds.azureblob.blobstore.functions.BlobPropertiesToBlobMetadata;
|
||||
|
@ -42,7 +43,6 @@ import org.jclouds.azureblob.domain.BlobProperties;
|
|||
import org.jclouds.azureblob.domain.ContainerProperties;
|
||||
import org.jclouds.azureblob.domain.ListBlobsResponse;
|
||||
import org.jclouds.azureblob.options.ListBlobsOptions;
|
||||
import org.jclouds.azure.storage.domain.BoundedSet;
|
||||
import org.jclouds.blobstore.BlobStoreContext;
|
||||
import org.jclouds.blobstore.domain.Blob;
|
||||
import org.jclouds.blobstore.domain.BlobMetadata;
|
||||
|
@ -79,18 +79,18 @@ public class AzureAsyncBlobStore extends BaseAsyncBlobStore {
|
|||
|
||||
@Inject
|
||||
AzureAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
|
||||
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
|
||||
@Memoized Supplier<Set<? extends Location>> locations, AzureBlobAsyncClient async,
|
||||
ContainerToResourceMetadata container2ResourceMd,
|
||||
ListOptionsToListBlobsOptions blobStore2AzureContainerListOptions,
|
||||
ListBlobsResponseToResourceList azure2BlobStoreResourceList, AzureBlobToBlob azureBlob2Blob,
|
||||
BlobToAzureBlob blob2AzureBlob, BlobPropertiesToBlobMetadata blob2BlobMd,
|
||||
BlobToHttpGetOptions blob2ObjectGetOptions) {
|
||||
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
|
||||
@Memoized Supplier<Set<? extends Location>> locations, AzureBlobAsyncClient async,
|
||||
ContainerToResourceMetadata container2ResourceMd,
|
||||
ListOptionsToListBlobsOptions blobStore2AzureContainerListOptions,
|
||||
ListBlobsResponseToResourceList azure2BlobStoreResourceList, AzureBlobToBlob azureBlob2Blob,
|
||||
BlobToAzureBlob blob2AzureBlob, BlobPropertiesToBlobMetadata blob2BlobMd,
|
||||
BlobToHttpGetOptions blob2ObjectGetOptions) {
|
||||
super(context, blobUtils, service, defaultLocation, locations);
|
||||
this.async = checkNotNull(async, "async");
|
||||
this.container2ResourceMd = checkNotNull(container2ResourceMd, "container2ResourceMd");
|
||||
this.blobStore2AzureContainerListOptions = checkNotNull(blobStore2AzureContainerListOptions,
|
||||
"blobStore2AzureContainerListOptions");
|
||||
"blobStore2AzureContainerListOptions");
|
||||
this.azure2BlobStoreResourceList = checkNotNull(azure2BlobStoreResourceList, "azure2BlobStoreResourceList");
|
||||
this.azureBlob2Blob = checkNotNull(azureBlob2Blob, "azureBlob2Blob");
|
||||
this.blob2AzureBlob = checkNotNull(blob2AzureBlob, "blob2AzureBlob");
|
||||
|
@ -105,15 +105,15 @@ public class AzureAsyncBlobStore extends BaseAsyncBlobStore {
|
|||
@Override
|
||||
public ListenableFuture<org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>> list() {
|
||||
return Futures
|
||||
.compose(
|
||||
async.listContainers(includeMetadata()),
|
||||
new Function<BoundedSet<ContainerProperties>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() {
|
||||
public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(
|
||||
BoundedSet<ContainerProperties> from) {
|
||||
return new PageSetImpl<StorageMetadata>(Iterables.transform(from, container2ResourceMd),
|
||||
from.getNextMarker());
|
||||
}
|
||||
}, service);
|
||||
.compose(
|
||||
async.listContainers(includeMetadata()),
|
||||
new Function<BoundedSet<ContainerProperties>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() {
|
||||
public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(
|
||||
BoundedSet<ContainerProperties> from) {
|
||||
return new PageSetImpl<StorageMetadata>(Iterables.transform(from, container2ResourceMd), from
|
||||
.getNextMarker());
|
||||
}
|
||||
}, service);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -243,4 +243,9 @@ public class AzureAsyncBlobStore extends BaseAsyncBlobStore {
|
|||
throw new UnsupportedOperationException("please use deleteContainer");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
|
||||
return putBlob(container, blob);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue