Issue 486: moved netty to a driver and created a base payload slicer

This commit is contained in:
Adrian Cole 2011-03-06 00:04:46 -05:00
parent 16843d9a92
commit b6667353f9
27 changed files with 597 additions and 203 deletions

View File

@ -250,4 +250,9 @@ public class AtmosAsyncBlobStore extends BaseAsyncBlobStore {
return async.deletePath(container + "/" + key); return async.deletePath(container + "/" + key);
} }
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
} }

View File

@ -660,4 +660,10 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
String eTag = CryptoStreams.hex(object.getPayload().getContentMetadata().getContentMD5()); String eTag = CryptoStreams.hex(object.getPayload().getContentMetadata().getContentMD5());
return eTag; return eTag;
} }
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
} }

View File

@ -84,12 +84,12 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore {
@Inject @Inject
protected S3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, protected S3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation, @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, S3AsyncClient async, S3Client sync, @Memoized Supplier<Set<? extends Location>> locations, S3AsyncClient async, S3Client sync,
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) { Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
super(context, blobUtils, service, defaultLocation, locations); super(context, blobUtils, service, defaultLocation, locations);
this.blob2ObjectGetOptions = checkNotNull(blob2ObjectGetOptions, "blob2ObjectGetOptions"); this.blob2ObjectGetOptions = checkNotNull(blob2ObjectGetOptions, "blob2ObjectGetOptions");
this.async = checkNotNull(async, "async"); this.async = checkNotNull(async, "async");
@ -109,11 +109,11 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore {
@Override @Override
public ListenableFuture<PageSet<? extends StorageMetadata>> list() { public ListenableFuture<PageSet<? extends StorageMetadata>> list() {
return Futures.compose(async.listOwnedBuckets(), return Futures.compose(async.listOwnedBuckets(),
new Function<Set<BucketMetadata>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() { new Function<Set<BucketMetadata>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() {
public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(Set<BucketMetadata> from) { public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(Set<BucketMetadata> from) {
return new PageSetImpl<StorageMetadata>(Iterables.transform(from, bucket2ResourceMd), null); return new PageSetImpl<StorageMetadata>(Iterables.transform(from, bucket2ResourceMd), null);
} }
}, service); }, service);
} }
/** /**
@ -153,9 +153,9 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore {
ListBucketOptions httpOptions = container2BucketListOptions.apply(options); ListBucketOptions httpOptions = container2BucketListOptions.apply(options);
ListenableFuture<ListBucketResponse> returnVal = async.listBucket(container, httpOptions); ListenableFuture<ListBucketResponse> returnVal = async.listBucket(container, httpOptions);
ListenableFuture<PageSet<? extends StorageMetadata>> list = Futures.compose(returnVal, bucket2ResourceList, ListenableFuture<PageSet<? extends StorageMetadata>> list = Futures.compose(returnVal, bucket2ResourceList,
service); service);
return (options.isDetailed()) ? Futures.compose(list, return (options.isDetailed()) ? Futures.compose(list,
fetchBlobMetadataProvider.get().setContainerName(container), service) : list; fetchBlobMetadataProvider.get().setContainerName(container), service) : list;
} }
/** /**
@ -238,4 +238,9 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore {
return async.deleteObject(container, key); return async.deleteObject(container, key);
} }
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
} }

View File

@ -55,12 +55,16 @@ public class S3BlobStoreContextModule extends AbstractModule {
bind(ConsistencyModel.class).toInstance(ConsistencyModel.EVENTUAL); bind(ConsistencyModel.class).toInstance(ConsistencyModel.EVENTUAL);
bind(AsyncBlobStore.class).to(S3AsyncBlobStore.class).in(Scopes.SINGLETON); bind(AsyncBlobStore.class).to(S3AsyncBlobStore.class).in(Scopes.SINGLETON);
bind(BlobStore.class).to(S3BlobStore.class).in(Scopes.SINGLETON); bind(BlobStore.class).to(S3BlobStore.class).in(Scopes.SINGLETON);
bind(BlobStoreContext.class).to(new TypeLiteral<BlobStoreContextImpl<S3Client, S3AsyncClient>>() { bindContext();
}).in(Scopes.SINGLETON);
bind(BlobRequestSigner.class).to(S3BlobRequestSigner.class); bind(BlobRequestSigner.class).to(S3BlobRequestSigner.class);
bindBucketLocationStrategy(); bindBucketLocationStrategy();
} }
protected void bindContext() {
bind(BlobStoreContext.class).to(new TypeLiteral<BlobStoreContextImpl<S3Client, S3AsyncClient>>() {
}).in(Scopes.SINGLETON);
}
protected void bindBucketLocationStrategy() { protected void bindBucketLocationStrategy() {
bind(new TypeLiteral<Function<BucketMetadata, Location>>() { bind(new TypeLiteral<Function<BucketMetadata, Location>>() {
}).to(LocationFromBucketLocation.class); }).to(LocationFromBucketLocation.class);

View File

@ -83,13 +83,13 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
@Inject @Inject
SwiftAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, SwiftAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation, @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, CommonSwiftClient sync, CommonSwiftAsyncClient async, @Memoized Supplier<Set<? extends Location>> locations, CommonSwiftClient sync, CommonSwiftAsyncClient async,
ContainerToResourceMetadata container2ResourceMd, ContainerToResourceMetadata container2ResourceMd,
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions, BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object, ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions, ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) { Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
super(context, blobUtils, service, defaultLocation, locations); super(context, blobUtils, service, defaultLocation, locations);
this.sync = sync; this.sync = sync;
this.async = async; this.async = async;
@ -109,12 +109,11 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
@Override @Override
public ListenableFuture<PageSet<? extends StorageMetadata>> list() { public ListenableFuture<PageSet<? extends StorageMetadata>> list() {
return Futures.compose(async.listContainers(), return Futures.compose(async.listContainers(),
new Function<Set<ContainerMetadata>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() { new Function<Set<ContainerMetadata>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() {
public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply( public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(Set<ContainerMetadata> from) {
Set<ContainerMetadata> from) { return new PageSetImpl<StorageMetadata>(Iterables.transform(from, container2ResourceMd), null);
return new PageSetImpl<StorageMetadata>(Iterables.transform(from, container2ResourceMd), null); }
} }, service);
}, service);
} }
/** /**
@ -145,12 +144,12 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
@Override @Override
public ListenableFuture<PageSet<? extends StorageMetadata>> list(String container, ListContainerOptions options) { public ListenableFuture<PageSet<? extends StorageMetadata>> list(String container, ListContainerOptions options) {
org.jclouds.openstack.swift.options.ListContainerOptions httpOptions = container2ContainerListOptions org.jclouds.openstack.swift.options.ListContainerOptions httpOptions = container2ContainerListOptions
.apply(options); .apply(options);
ListenableFuture<PageSet<ObjectInfo>> returnVal = async.listObjects(container, httpOptions); ListenableFuture<PageSet<ObjectInfo>> returnVal = async.listObjects(container, httpOptions);
ListenableFuture<PageSet<? extends StorageMetadata>> list = Futures.compose(returnVal, container2ResourceList, ListenableFuture<PageSet<? extends StorageMetadata>> list = Futures.compose(returnVal, container2ResourceList,
service); service);
return options.isDetailed() ? Futures.compose(list, fetchBlobMetadataProvider.get().setContainerName(container), return options.isDetailed() ? Futures.compose(list, fetchBlobMetadataProvider.get().setContainerName(container),
service) : list; service) : list;
} }
/** /**
@ -177,14 +176,14 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
@Override @Override
public ListenableFuture<BlobMetadata> blobMetadata(String container, String key) { public ListenableFuture<BlobMetadata> blobMetadata(String container, String key) {
return Futures.compose(async.getObjectInfo(container, key), return Futures.compose(async.getObjectInfo(container, key),
new Function<MutableObjectInfoWithMetadata, BlobMetadata>() { new Function<MutableObjectInfoWithMetadata, BlobMetadata>() {
@Override @Override
public BlobMetadata apply(MutableObjectInfoWithMetadata from) { public BlobMetadata apply(MutableObjectInfoWithMetadata from) {
return object2BlobMd.apply(from); return object2BlobMd.apply(from);
} }
}, service); }, service);
} }
/** /**
@ -235,4 +234,9 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
return !sync.containerExists(container); return !sync.containerExists(container);
} }
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
} }

View File

@ -32,6 +32,7 @@ import org.jclouds.blobstore.options.GetOptions;
import org.jclouds.blobstore.options.ListContainerOptions; import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.domain.Location; import org.jclouds.domain.Location;
import com.google.common.annotations.Beta;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
/** /**
@ -126,6 +127,12 @@ public interface AsyncBlobStore {
*/ */
ListenableFuture<String> putBlob(String container, Blob blob); ListenableFuture<String> putBlob(String container, Blob blob);
/**
* @see BlobStore#putBlobMultipart
*/
@Beta
ListenableFuture<String> putBlobMultipart(String container, Blob blob);
/** /**
* @see BlobStore#blobMetadata * @see BlobStore#blobMetadata
*/ */

View File

@ -32,6 +32,8 @@ import org.jclouds.blobstore.options.GetOptions;
import org.jclouds.blobstore.options.ListContainerOptions; import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.domain.Location; import org.jclouds.domain.Location;
import com.google.common.annotations.Beta;
/** /**
* Synchronous access to a BlobStore such as Amazon S3 * Synchronous access to a BlobStore such as Amazon S3
* *
@ -48,17 +50,18 @@ public interface BlobStore {
/** /**
* creates a new blob with the specified name. * creates a new blob with the specified name.
*
* @see #blobBuilder * @see #blobBuilder
*/ */
@Deprecated @Deprecated
Blob newBlob(String name); Blob newBlob(String name);
/** /**
* *
* @return builder for creating new {@link Blob}s * @return builder for creating new {@link Blob}s
*/ */
BlobBuilder blobBuilder(String name); BlobBuilder blobBuilder(String name);
/** /**
* The get locations command returns all the valid locations for containers. A location has a * The get locations command returns all the valid locations for containers. A location has a
* scope, which is typically region or zone. A region is a general area, like eu-west, where a * scope, which is typically region or zone. A region is a general area, like eu-west, where a
@ -199,7 +202,7 @@ public interface BlobStore {
* if the container doesn't exist * if the container doesn't exist
*/ */
String putBlob(String container, Blob blob); String putBlob(String container, Blob blob);
/** /**
* Adds a {@code Blob} representing the data at location {@code container/blob.metadata.name} * Adds a {@code Blob} representing the data at location {@code container/blob.metadata.name}
* using multipart strategies. * using multipart strategies.
@ -214,6 +217,7 @@ public interface BlobStore {
* @throws ContainerNotFoundException * @throws ContainerNotFoundException
* if the container doesn't exist * if the container doesn't exist
*/ */
@Beta
String putBlobMultipart(String container, Blob blob); String putBlobMultipart(String container, Blob blob);
/** /**

View File

@ -675,4 +675,10 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
return containerToLocation; return containerToLocation;
} }
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
// TODO implement
return putBlob(container, blob);
}
} }

View File

@ -119,11 +119,6 @@
<artifactId>jsr305</artifactId> <artifactId>jsr305</artifactId>
<version>1.3.9</version> <version>1.3.9</version>
</dependency> </dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.4.Final</version>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>

View File

@ -0,0 +1,47 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.io;
import org.jclouds.io.internal.BasePayloadSlicer;
import com.google.inject.ImplementedBy;
/**
*
* @author Adrian Cole
*/
@ImplementedBy(BasePayloadSlicer.class)
public interface PayloadSlicer {
/**
* Returns a {@link Payload} that returns input streams from the an underlying payload, where
* each stream starts at the given offset and is limited to the specified number of bytes.
*
* @param input
* the payload from which to get the raw streams
* @param offset
* the offset in bytes into the underlying stream where the returned streams will start
* @param length
* the maximum length of the returned streams
* @throws IllegalArgumentException
* if offset or length are negative
*/
Payload slice(Payload input, long offset, long length);
}

View File

@ -35,7 +35,6 @@ import javax.annotation.Nullable;
import org.jclouds.crypto.CryptoStreams; import org.jclouds.crypto.CryptoStreams;
import org.jclouds.io.payloads.ByteArrayPayload; import org.jclouds.io.payloads.ByteArrayPayload;
import org.jclouds.io.payloads.ChunkedFilePayload;
import org.jclouds.io.payloads.FilePayload; import org.jclouds.io.payloads.FilePayload;
import org.jclouds.io.payloads.InputStreamPayload; import org.jclouds.io.payloads.InputStreamPayload;
import org.jclouds.io.payloads.StringPayload; import org.jclouds.io.payloads.StringPayload;
@ -84,10 +83,6 @@ public class Payloads {
public static FilePayload newFilePayload(File data) { public static FilePayload newFilePayload(File data) {
return new FilePayload(checkNotNull(data, "data")); return new FilePayload(checkNotNull(data, "data"));
} }
public static ChunkedFilePayload newChunkedFilePayload(File data, int part, long chunkOffset, long chunkSize) {
return new ChunkedFilePayload(checkNotNull(data, "data"), part, chunkOffset, chunkSize);
}
public static UrlEncodedFormPayload newUrlEncodedFormPayload(Multimap<String, String> formParams, char... skips) { public static UrlEncodedFormPayload newUrlEncodedFormPayload(Multimap<String, String> formParams, char... skips) {
return new UrlEncodedFormPayload(formParams, skips); return new UrlEncodedFormPayload(formParams, skips);

View File

@ -0,0 +1,104 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.io.internal;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import javax.inject.Singleton;
import org.jclouds.io.InputSuppliers;
import org.jclouds.io.Payload;
import org.jclouds.io.PayloadSlicer;
import org.jclouds.io.payloads.BaseMutableContentMetadata;
import org.jclouds.io.payloads.InputStreamSupplierPayload;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
/**
*
* @author Adrian Cole
*/
@Singleton
public class BasePayloadSlicer implements PayloadSlicer {
/**
* {@inheritDoc}
*/
@Override
public Payload slice(Payload input, long offset, long length) {
checkNotNull(input);
checkArgument(offset >= 0, "offset is negative");
checkArgument(length >= 0, "length is negative");
Payload returnVal;
if (input.getRawContent() instanceof File) {
returnVal = doSlice((File) input.getRawContent(), offset, length);
} else if (input.getRawContent() instanceof String) {
returnVal = doSlice((byte[]) input.getRawContent(), offset, length);
} else if (input.getRawContent() instanceof byte[]) {
returnVal = doSlice((byte[]) input.getRawContent(), offset, length);
} else {
returnVal = doSlice(input.getInput(), offset, length);
}
return copyMetadataAndSetLength(input, returnVal, length);
}
protected Payload doSlice(Payload content, long offset, long length) {
return new InputStreamSupplierPayload(ByteStreams.slice(content, offset, length));
}
protected Payload doSlice(String content, long offset, long length) {
return doSlice(content.getBytes(), offset, length);
}
protected Payload doSlice(File content, long offset, long length) {
try {
return doSlice(new FileInputStream(content), offset, length);
} catch (FileNotFoundException e) {
Throwables.propagate(e);
return null;
}
}
protected Payload doSlice(InputStream content, long offset, long length) {
return new InputStreamSupplierPayload(ByteStreams.slice(InputSuppliers.of(content), offset, length));
}
protected Payload doSlice(byte[] content, long offset, long length) {
Payload returnVal;
checkArgument(offset <= Integer.MAX_VALUE, "offset is too big for an array");
checkArgument(length <= Integer.MAX_VALUE, "length is too big for an array");
returnVal = new InputStreamSupplierPayload(
ByteStreams.newInputStreamSupplier(content, (int) offset, (int) length));
return returnVal;
}
protected Payload copyMetadataAndSetLength(Payload input, Payload returnVal, long length) {
returnVal.setContentMetadata(BaseMutableContentMetadata.fromContentMetadata(input.getContentMetadata()
.toBuilder().contentLength(length).contentMD5(null).build()));
return returnVal;
}
}

View File

@ -0,0 +1,75 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.io.payloads;
import static com.google.common.io.Closeables.closeQuietly;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.io.InputSupplier;
/**
* @author Adrian Cole
*/
public class InputStreamSupplierPayload extends BasePayload<InputSupplier<? extends InputStream>> {
private List<InputStream> toClose = Lists.newArrayList();
public InputStreamSupplierPayload(InputSupplier<? extends InputStream> content) {
super(content);
}
/**
* {@inheritDoc}
*/
@Override
public InputStream getInput() {
try {
InputStream returnVal = content.getInput();
toClose.add(returnVal);
return returnVal;
} catch (IOException e) {
Throwables.propagate(e);
return null;
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean isRepeatable() {
return true;
}
/**
* if we created the stream, then it is already consumed on close.
*/
@Override
public void release() {
if (toClose.size() > 0)
for (InputStream content = toClose.remove(0); toClose.size() > 0; content = toClose.remove(0))
closeQuietly(content);
}
}

BIN
drivers/netty/.pom.xml.swp Normal file

Binary file not shown.

82
drivers/netty/pom.xml Normal file
View File

@ -0,0 +1,82 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
====================================================================
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
====================================================================
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.jclouds</groupId>
<artifactId>jclouds-project</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../project/pom.xml</relativePath>
</parent>
<groupId>org.jclouds.driver</groupId>
<artifactId>jclouds-netty</artifactId>
<name>jclouds netty payload module</name>
<description>jclouds netty payload module</description>
<!-- bootstrapping: need to fetch the project POM -->
<repositories>
<repository>
<id>jclouds-sona-snapshots-nexus</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>jboss-public-releases</id>
<url>https://repository.jboss.org/nexus/content/groups/public-jboss</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.jclouds</groupId>
<artifactId>jclouds-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.jclouds</groupId>
<artifactId>jclouds-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.4.Final</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,20 @@
package org.jclouds.netty.config;
import org.jclouds.io.PayloadSlicer;
import org.jclouds.netty.io.NettyPayloadSlicer;
import com.google.inject.AbstractModule;
/**
*
* @author Adrian Cole
*
*/
public class NettyPayloadModule extends AbstractModule {
@Override
protected void configure() {
bind(PayloadSlicer.class).to(NettyPayloadSlicer.class);
}
}

View File

@ -17,10 +17,9 @@
* ==================================================================== * ====================================================================
*/ */
package org.jclouds.io.payloads; package org.jclouds.netty.io;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
@ -30,24 +29,24 @@ import org.jboss.netty.handler.stream.ChunkedFile;
/** /**
* *
* *
* *
* *
* @author Tibor Kiss * @author Tibor Kiss
*/ */
public class ChunkedFileInputStream extends InputStream { public class ChunkedFileInputStream extends InputStream {
private static final int CHUNK_SIZE = 8192; private static final int CHUNK_SIZE = 8192;
private ChunkedFile chunks; private ChunkedFile chunks;
private ChannelBuffer chunk; private ChannelBuffer chunk;
private IOException ex; private IOException ex;
public ChunkedFileInputStream(String filename, long offset, long length) { public ChunkedFileInputStream(String filename, long offset, long length) {
this(new File(filename), offset, length); this(new File(filename), offset, length);
} }
public ChunkedFileInputStream(File file, long offset, long length) { public ChunkedFileInputStream(File file, long offset, long length) {
try { try {
this.chunks = new ChunkedFile(new RandomAccessFile(file, "r"), offset, length, CHUNK_SIZE); this.chunks = new ChunkedFile(new RandomAccessFile(file, "r"), offset, length, CHUNK_SIZE);
@ -62,9 +61,9 @@ public class ChunkedFileInputStream extends InputStream {
} }
if (chunk == null) { if (chunk == null) {
chunk = ChannelBuffer.class.cast(chunks.nextChunk()); chunk = ChannelBuffer.class.cast(chunks.nextChunk());
} }
if (chunk != null) { if (chunk != null) {
if (chunk.readableBytes() < 1 && chunks.hasNextChunk()) { if (chunk.readableBytes() < 1 && chunks.hasNextChunk()) {
chunk = ChannelBuffer.class.cast(chunks.nextChunk()); chunk = ChannelBuffer.class.cast(chunks.nextChunk());
if (chunk.readableBytes() < 1) { if (chunk.readableBytes() < 1) {
return null; return null;
@ -75,7 +74,7 @@ public class ChunkedFileInputStream extends InputStream {
} }
return chunk; return chunk;
} }
@Override @Override
public int read() throws IOException { public int read() throws IOException {
try { try {
@ -87,15 +86,15 @@ public class ChunkedFileInputStream extends InputStream {
int readIndex = chunk.readerIndex(); int readIndex = chunk.readerIndex();
byte abyte = chunk.getByte(readIndex); byte abyte = chunk.getByte(readIndex);
chunk.readerIndex(readIndex + 1); chunk.readerIndex(readIndex + 1);
return (int)abyte; return (int) abyte;
} catch (Exception e) { } catch (Exception e) {
throw new IOException(e); throw new IOException(e);
} }
} }
@Override @Override
public int read(byte[] b, int off, int len) throws IOException { public int read(byte[] b, int off, int len) throws IOException {
try { try {
ChannelBuffer chunk = getChunk(); ChannelBuffer chunk = getChunk();
if (chunk == null) if (chunk == null)
return -1; return -1;
@ -111,7 +110,7 @@ public class ChunkedFileInputStream extends InputStream {
return readable; return readable;
} catch (Exception e) { } catch (Exception e) {
throw new IOException(e); throw new IOException(e);
} }
} }
@Override @Override

View File

@ -17,34 +17,26 @@
* ==================================================================== * ====================================================================
*/ */
package org.jclouds.io.payloads; package org.jclouds.netty.io;
import java.io.File; import java.io.File;
import java.io.InputStream;
public class ChunkedFilePayload extends FilePayload { import javax.inject.Singleton;
private int part; import org.jclouds.io.Payload;
private long chunkOffset; import org.jclouds.io.Payloads;
private long chunkSize; import org.jclouds.io.internal.BasePayloadSlicer;
public ChunkedFilePayload(File content) { /**
this(content, 1, 0, content.length()); *
} * @author Adrian Cole
*/
public ChunkedFilePayload(File content, int part, long chunkOffset, long chunkSize) { @Singleton
super(content); public class NettyPayloadSlicer extends BasePayloadSlicer {
this.part = part;
this.chunkOffset = chunkOffset;
this.chunkSize = chunkSize;
}
public int getPart() {
return part;
}
@Override @Override
public InputStream getInput() { protected Payload doSlice(File content, long offset, long length) {
return new ChunkedFileInputStream(getRawContent(), chunkOffset, chunkSize); return Payloads.newInputStreamPayload(new ChunkedFileInputStream(content, offset, length));
} }
}
}

View File

@ -38,6 +38,7 @@
<module>bouncycastle</module> <module>bouncycastle</module>
<module>log4j</module> <module>log4j</module>
<module>jsch</module> <module>jsch</module>
<module>netty</module>
<module>enterprise</module> <module>enterprise</module>
</modules> </modules>
</project> </project>

View File

@ -42,6 +42,21 @@
<test.aws-s3.credential>${test.aws.credential}</test.aws-s3.credential> <test.aws-s3.credential>${test.aws.credential}</test.aws-s3.credential>
</properties> </properties>
<!-- temporary -->
<repositories>
<repository>
<id>jboss-public-releases</id>
<url>https://repository.jboss.org/nexus/content/groups/public-jboss</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.jclouds.api</groupId> <groupId>org.jclouds.api</groupId>
@ -94,6 +109,18 @@
<version>1.2.16</version> <version>1.2.16</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.jclouds.driver</groupId>
<artifactId>jclouds-netty</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.4.Final</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>

View File

@ -25,7 +25,6 @@ import java.util.Properties;
import org.jclouds.aws.s3.blobstore.config.AWSS3BlobStoreContextModule; import org.jclouds.aws.s3.blobstore.config.AWSS3BlobStoreContextModule;
import org.jclouds.aws.s3.config.AWSS3RestClientModule; import org.jclouds.aws.s3.config.AWSS3RestClientModule;
import org.jclouds.s3.S3ContextBuilder; import org.jclouds.s3.S3ContextBuilder;
import org.jclouds.s3.blobstore.config.S3BlobStoreContextModule;
import com.google.inject.Module; import com.google.inject.Module;

View File

@ -29,7 +29,9 @@ import javax.inject.Provider;
import org.jclouds.Constants; import org.jclouds.Constants;
import org.jclouds.aws.s3.AWSS3AsyncClient; import org.jclouds.aws.s3.AWSS3AsyncClient;
import org.jclouds.aws.s3.AWSS3Client; import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.functions.BlobToHttpGetOptions; import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata; import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata;
import org.jclouds.blobstore.util.BlobUtils; import org.jclouds.blobstore.util.BlobUtils;
@ -44,13 +46,17 @@ import org.jclouds.s3.blobstore.functions.ObjectToBlob;
import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata; import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
/** /**
* *
* @author Tibor Kiss * @author Tibor Kiss
*/ */
public class AWSS3AsyncBlobStore extends S3AsyncBlobStore { public class AWSS3AsyncBlobStore extends S3AsyncBlobStore {
private final Provider<MultipartUploadStrategy> multipartUploadStrategy;
@Inject @Inject
public AWSS3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, public AWSS3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation, @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
@ -58,10 +64,19 @@ public class AWSS3AsyncBlobStore extends S3AsyncBlobStore {
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) { Provider<FetchBlobMetadata> fetchBlobMetadataProvider,
super(context, blobUtils, service, defaultLocation, locations, async, sync, bucket2ResourceMd, Provider<MultipartUploadStrategy> multipartUploadStrategy) {
container2BucketListOptions, bucket2ResourceList, object2Blob, blob2ObjectGetOptions, super(context, blobUtils, service, defaultLocation, locations, async, sync, bucket2ResourceMd,
blob2Object, object2BlobMd, fetchBlobMetadataProvider); container2BucketListOptions, bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object,
object2BlobMd, fetchBlobMetadataProvider);
this.multipartUploadStrategy = multipartUploadStrategy;
}
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
// TODO: make this better
// need to use a provider if the strategy object is stateful
return Futures.immediateFuture(multipartUploadStrategy.get().execute(container, blob));
} }
} }

View File

@ -26,7 +26,6 @@ import javax.inject.Provider;
import org.jclouds.aws.s3.AWSS3Client; import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy;
import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.functions.BlobToHttpGetOptions; import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
@ -45,32 +44,30 @@ import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
/** /**
* Proived AWS S3 specific extensions. * Proived AWS S3 specific extensions.
* *
* @author Tibor Kiss * @author Tibor Kiss
*/ */
public class AWSS3BlobStore extends S3BlobStore { public class AWSS3BlobStore extends S3BlobStore {
private MultipartUploadStrategy multipartUploadStrategy; private final Provider<MultipartUploadStrategy> multipartUploadStrategy;
@Inject @Inject
AWSS3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation, AWSS3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, AWSS3Client sync, @Memoized Supplier<Set<? extends Location>> locations, AWSS3Client sync,
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions, BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) { Provider<FetchBlobMetadata> fetchBlobMetadataProvider, Provider<MultipartUploadStrategy> multipartUploadStrategy) {
super(context, blobUtils, defaultLocation, locations, sync, bucket2ResourceMd, container2BucketListOptions, super(context, blobUtils, defaultLocation, locations, sync, bucket2ResourceMd, container2BucketListOptions,
bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object, object2BlobMd, bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object, object2BlobMd,
fetchBlobMetadataProvider); fetchBlobMetadataProvider);
multipartUploadStrategy = new SequentialMultipartUploadStrategy(this); this.multipartUploadStrategy = multipartUploadStrategy;
} }
/* (non-Javadoc)
* @see org.jclouds.blobstore.internal.BaseBlobStore#putBlobMultipart(java.lang.String, org.jclouds.blobstore.domain.Blob)
*/
@Override @Override
public String putBlobMultipart(String container, Blob blob) { public String putBlobMultipart(String container, Blob blob) {
return multipartUploadStrategy.execute(container, blob); // need to use a provider if the strategy object is stateful
return multipartUploadStrategy.get().execute(container, blob);
} }
} }

View File

@ -23,15 +23,12 @@ import org.jclouds.aws.s3.AWSS3AsyncClient;
import org.jclouds.aws.s3.AWSS3Client; import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.AWSS3AsyncBlobStore; import org.jclouds.aws.s3.blobstore.AWSS3AsyncBlobStore;
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
import org.jclouds.blobstore.AsyncBlobStore; import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.blobstore.BlobRequestSigner; import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.attr.ConsistencyModel;
import org.jclouds.blobstore.config.BlobStoreMapModule;
import org.jclouds.blobstore.internal.BlobStoreContextImpl; import org.jclouds.blobstore.internal.BlobStoreContextImpl;
import org.jclouds.location.config.RegionsLocationModule; import org.jclouds.s3.blobstore.S3AsyncBlobStore;
import org.jclouds.s3.blobstore.S3BlobRequestSigner; import org.jclouds.s3.blobstore.S3BlobStore;
import org.jclouds.s3.blobstore.config.S3BlobStoreContextModule; import org.jclouds.s3.blobstore.config.S3BlobStoreContextModule;
import com.google.inject.Scopes; import com.google.inject.Scopes;
@ -39,21 +36,23 @@ import com.google.inject.TypeLiteral;
/** /**
* *
* *
* @author Tibor Kiss * @author Tibor Kiss
*/ */
public class AWSS3BlobStoreContextModule extends S3BlobStoreContextModule { public class AWSS3BlobStoreContextModule extends S3BlobStoreContextModule {
@Override @Override
protected void configure() { protected void configure() {
install(new BlobStoreMapModule()); super.configure();
install(new RegionsLocationModule()); bind(S3AsyncBlobStore.class).to(AWSS3AsyncBlobStore.class).in(Scopes.SINGLETON);
bind(ConsistencyModel.class).toInstance(ConsistencyModel.EVENTUAL); bind(S3BlobStore.class).to(AWSS3BlobStore.class).in(Scopes.SINGLETON);
bind(AsyncBlobStore.class).to(AWSS3AsyncBlobStore.class).in(Scopes.SINGLETON); bind(MultipartUploadStrategy.class).to(SequentialMultipartUploadStrategy.class);
bind(BlobStore.class).to(AWSS3BlobStore.class).in(Scopes.SINGLETON); }
@Override
protected void bindContext() {
bind(BlobStoreContext.class).to(new TypeLiteral<BlobStoreContextImpl<AWSS3Client, AWSS3AsyncClient>>() { bind(BlobStoreContext.class).to(new TypeLiteral<BlobStoreContextImpl<AWSS3Client, AWSS3AsyncClient>>() {
}).in(Scopes.SINGLETON); }).in(Scopes.SINGLETON);
bind(BlobRequestSigner.class).to(S3BlobRequestSigner.class);
bindBucketLocationStrategy();
} }
} }

View File

@ -19,19 +19,23 @@
package org.jclouds.aws.s3.blobstore.strategy.internal; package org.jclouds.aws.s3.blobstore.strategy.internal;
import static org.jclouds.io.Payloads.newChunkedFilePayload; import static com.google.common.base.Preconditions.checkNotNull;
import java.io.File;
import java.util.Map; import java.util.Map;
import javax.annotation.Resource;
import javax.inject.Inject;
import javax.inject.Named;
import org.jclouds.aws.s3.AWSS3Client; import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.blobstore.KeyNotFoundException; import org.jclouds.blobstore.KeyNotFoundException;
import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.reference.BlobStoreConstants;
import org.jclouds.io.Payload; import org.jclouds.io.Payload;
import org.jclouds.io.payloads.FilePayload; import org.jclouds.io.PayloadSlicer;
import org.jclouds.logging.Logger;
import org.jclouds.s3.domain.ObjectMetadataBuilder; import org.jclouds.s3.domain.ObjectMetadataBuilder;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -40,35 +44,39 @@ import com.google.common.collect.Maps;
* Provides a sequential multipart upload strategy. * Provides a sequential multipart upload strategy.
* *
* The file partitioning algorithm: * The file partitioning algorithm:
* *
* The default partition size we choose is 32mb. * The default partition size we choose is 32mb. A multiple of this default partition size is used.
* A multiple of this default partition size is used. * The number of parts first grows to a chosen magnitude (for example 100 parts), then it grows the
* The number of parts first grows to a chosen magnitude (for example 100 parts), * partition size instead of number of partitions. When we reached the maximum part size, then again
* then it grows the partition size instead of number of partitions. * it starts to grow the number of partitions.
* When we reached the maximum part size, then again it starts to grow the number of partitions. *
*
* @author Tibor Kiss * @author Tibor Kiss
*/ */
public class SequentialMultipartUploadStrategy implements public class SequentialMultipartUploadStrategy implements MultipartUploadStrategy {
MultipartUploadStrategy { @Resource
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
protected Logger logger = Logger.NULL;
private final long DEFAULT_PART_SIZE = 33554432; // 32mb private final long DEFAULT_PART_SIZE = 33554432; // 32mb
private final int MAGNITUDE_BASE = 100; private final int MAGNITUDE_BASE = 100;
private final AWSS3BlobStore ablobstore; private final AWSS3BlobStore ablobstore;
private final PayloadSlicer slicer;
// calculated only once, but not from the constructor // calculated only once, but not from the constructor
private volatile long parts; // required number of parts with chunkSize private volatile long parts; // required number of parts with chunkSize
private volatile long chunkSize; private volatile long chunkSize;
private volatile long remaining; // number of bytes remained for the last part private volatile long remaining; // number of bytes remained for the last part
// sequentially updated values // sequentially updated values
private volatile int part; private volatile int part;
private volatile long chunkOffset; private volatile long chunkOffset;
private volatile long copied; private volatile long copied;
public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore) { @Inject
this.ablobstore = ablobstore; public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore, PayloadSlicer slicer) {
this.ablobstore = checkNotNull(ablobstore, "ablobstore");
this.slicer = checkNotNull(slicer, "slicer");
} }
protected long calculateChunkSize(long length) { protected long calculateChunkSize(long length) {
@ -100,43 +108,39 @@ public class SequentialMultipartUploadStrategy implements
this.chunkSize = partSize; this.chunkSize = partSize;
this.parts = parts; this.parts = parts;
this.remaining = length - partSize * parts; this.remaining = length - partSize * parts;
System.out.println(" " + length + " bytes partitioned in " + parts logger.debug(" %d bytes partitioned in %d parts of part size: %d, remaining: %d%s", length, parts, chunkSize,
+ " parts of part size: " + chunkSize + ", remaining: " remaining, (remaining > MAX_PART_SIZE ? " overflow!" : ""));
+ remaining + (remaining > MAX_PART_SIZE ? " overflow!" : ""));
return this.chunkSize; return this.chunkSize;
} }
protected long getParts() { protected long getParts() {
return parts; return parts;
} }
protected int getNextPart() { protected int getNextPart() {
return ++part; return ++part;
} }
protected void addCopied(long copied) { protected void addCopied(long copied) {
this.copied += copied; this.copied += copied;
} }
protected long getNextChunkOffset() { protected long getNextChunkOffset() {
long next = chunkOffset; long next = chunkOffset;
chunkOffset += getChunkSize(); chunkOffset += getChunkSize();
return next; return next;
} }
protected long getChunkSize() { protected long getChunkSize() {
return chunkSize; return chunkSize;
} }
protected long getRemaining() { protected long getRemaining() {
return remaining; return remaining;
} }
private String prepareUploadPart(AWSS3Client client, String container, String key, String uploadId, private String prepareUploadPart(AWSS3Client client, String container, String key, String uploadId, int part,
int part, File file, long chunkOffset, long chunkSize) { Payload chunkedPart) {
Payload chunkedPart = newChunkedFilePayload(file, part, chunkOffset, chunkSize);
chunkedPart.getContentMetadata().setContentLength(chunkSize);
//chukedPayload.getContentMetadata().setContentMD5(???);
String eTag = null; String eTag = null;
try { try {
eTag = client.uploadPart(container, key, part, uploadId, chunkedPart); eTag = client.uploadPart(container, key, part, uploadId, chunkedPart);
@ -153,31 +157,27 @@ public class SequentialMultipartUploadStrategy implements
@Override @Override
public String execute(String container, Blob blob) { public String execute(String container, Blob blob) {
Payload payload = blob.getPayload(); String key = blob.getMetadata().getName();
if (payload instanceof FilePayload) { calculateChunkSize(blob.getPayload().getContentMetadata().getContentLength());
String key = blob.getMetadata().getName(); long parts = getParts();
File file = FilePayload.class.cast(payload).getRawContent(); if (parts > 0) {
calculateChunkSize(file.length()); AWSS3Client client = (AWSS3Client) ablobstore.getContext().getProviderSpecificContext().getApi();
long parts = getParts(); String uploadId = client.initiateMultipartUpload(container, ObjectMetadataBuilder.create().key(key).build()); // TODO
if (parts > 0) { // md5
AWSS3Client client = (AWSS3Client) ablobstore.getContext().getProviderSpecificContext().getApi(); Map<Integer, String> etags = Maps.newHashMap();
String uploadId = client.initiateMultipartUpload(container, int part;
ObjectMetadataBuilder.create().key(key).build()); // TODO md5 while ((part = getNextPart()) <= getParts()) {
Map<Integer, String> etags = Maps.newHashMap(); String eTag = prepareUploadPart(client, container, key, uploadId, part,
int part; slicer.slice(blob.getPayload(), getNextChunkOffset(), chunkSize));
while ((part = getNextPart()) <= getParts()) { etags.put(new Integer(part), eTag);
String eTag = prepareUploadPart(client, container, key, uploadId, part, file, getNextChunkOffset(), chunkSize);
etags.put(new Integer(part), eTag);
}
long remaining = getRemaining();
if (remaining > 0) {
String eTag = prepareUploadPart(client, container, key, uploadId, part, file, getNextChunkOffset(), remaining);
etags.put(new Integer(part), eTag);
}
return client.completeMultipartUpload(container, key, uploadId, etags);
} else {
return ablobstore.putBlob(container, blob);
} }
long remaining = getRemaining();
if (remaining > 0) {
String eTag = prepareUploadPart(client, container, key, uploadId, part,
slicer.slice(blob.getPayload(), getNextChunkOffset(), remaining));
etags.put(new Integer(part), eTag);
}
return client.completeMultipartUpload(container, key, uploadId, etags);
} else { } else {
return ablobstore.putBlob(container, blob); return ablobstore.putBlob(container, blob);
} }

View File

@ -26,6 +26,7 @@ import org.jclouds.blobstore.BlobStoreContextFactory;
import org.jclouds.blobstore.integration.TransientBlobStoreTestInitializer; import org.jclouds.blobstore.integration.TransientBlobStoreTestInitializer;
import org.jclouds.blobstore.integration.internal.BaseBlobStoreIntegrationTest; import org.jclouds.blobstore.integration.internal.BaseBlobStoreIntegrationTest;
import org.jclouds.logging.log4j.config.Log4JLoggingModule; import org.jclouds.logging.log4j.config.Log4JLoggingModule;
import org.jclouds.netty.config.NettyPayloadModule;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.inject.Module; import com.google.inject.Module;
@ -45,7 +46,7 @@ public class AWSS3TestInitializer extends TransientBlobStoreTestInitializer {
protected BlobStoreContext createLiveContext(Module configurationModule, String endpoint, String apiversion, protected BlobStoreContext createLiveContext(Module configurationModule, String endpoint, String apiversion,
String app, String identity, String credential) throws IOException { String app, String identity, String credential) throws IOException {
return new BlobStoreContextFactory().createContext(provider, ImmutableSet.of(configurationModule, return new BlobStoreContextFactory().createContext(provider, ImmutableSet.of(configurationModule,
new Log4JLoggingModule()), setupProperties(endpoint, apiversion, identity, credential)); new Log4JLoggingModule(), new NettyPayloadModule()), setupProperties(endpoint, apiversion, identity, credential));
} }
} }

View File

@ -30,6 +30,7 @@ import javax.inject.Named;
import javax.inject.Singleton; import javax.inject.Singleton;
import org.jclouds.Constants; import org.jclouds.Constants;
import org.jclouds.azure.storage.domain.BoundedSet;
import org.jclouds.azureblob.AzureBlobAsyncClient; import org.jclouds.azureblob.AzureBlobAsyncClient;
import org.jclouds.azureblob.blobstore.functions.AzureBlobToBlob; import org.jclouds.azureblob.blobstore.functions.AzureBlobToBlob;
import org.jclouds.azureblob.blobstore.functions.BlobPropertiesToBlobMetadata; import org.jclouds.azureblob.blobstore.functions.BlobPropertiesToBlobMetadata;
@ -42,7 +43,6 @@ import org.jclouds.azureblob.domain.BlobProperties;
import org.jclouds.azureblob.domain.ContainerProperties; import org.jclouds.azureblob.domain.ContainerProperties;
import org.jclouds.azureblob.domain.ListBlobsResponse; import org.jclouds.azureblob.domain.ListBlobsResponse;
import org.jclouds.azureblob.options.ListBlobsOptions; import org.jclouds.azureblob.options.ListBlobsOptions;
import org.jclouds.azure.storage.domain.BoundedSet;
import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.BlobMetadata; import org.jclouds.blobstore.domain.BlobMetadata;
@ -79,18 +79,18 @@ public class AzureAsyncBlobStore extends BaseAsyncBlobStore {
@Inject @Inject
AzureAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, AzureAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation, @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, AzureBlobAsyncClient async, @Memoized Supplier<Set<? extends Location>> locations, AzureBlobAsyncClient async,
ContainerToResourceMetadata container2ResourceMd, ContainerToResourceMetadata container2ResourceMd,
ListOptionsToListBlobsOptions blobStore2AzureContainerListOptions, ListOptionsToListBlobsOptions blobStore2AzureContainerListOptions,
ListBlobsResponseToResourceList azure2BlobStoreResourceList, AzureBlobToBlob azureBlob2Blob, ListBlobsResponseToResourceList azure2BlobStoreResourceList, AzureBlobToBlob azureBlob2Blob,
BlobToAzureBlob blob2AzureBlob, BlobPropertiesToBlobMetadata blob2BlobMd, BlobToAzureBlob blob2AzureBlob, BlobPropertiesToBlobMetadata blob2BlobMd,
BlobToHttpGetOptions blob2ObjectGetOptions) { BlobToHttpGetOptions blob2ObjectGetOptions) {
super(context, blobUtils, service, defaultLocation, locations); super(context, blobUtils, service, defaultLocation, locations);
this.async = checkNotNull(async, "async"); this.async = checkNotNull(async, "async");
this.container2ResourceMd = checkNotNull(container2ResourceMd, "container2ResourceMd"); this.container2ResourceMd = checkNotNull(container2ResourceMd, "container2ResourceMd");
this.blobStore2AzureContainerListOptions = checkNotNull(blobStore2AzureContainerListOptions, this.blobStore2AzureContainerListOptions = checkNotNull(blobStore2AzureContainerListOptions,
"blobStore2AzureContainerListOptions"); "blobStore2AzureContainerListOptions");
this.azure2BlobStoreResourceList = checkNotNull(azure2BlobStoreResourceList, "azure2BlobStoreResourceList"); this.azure2BlobStoreResourceList = checkNotNull(azure2BlobStoreResourceList, "azure2BlobStoreResourceList");
this.azureBlob2Blob = checkNotNull(azureBlob2Blob, "azureBlob2Blob"); this.azureBlob2Blob = checkNotNull(azureBlob2Blob, "azureBlob2Blob");
this.blob2AzureBlob = checkNotNull(blob2AzureBlob, "blob2AzureBlob"); this.blob2AzureBlob = checkNotNull(blob2AzureBlob, "blob2AzureBlob");
@ -105,15 +105,15 @@ public class AzureAsyncBlobStore extends BaseAsyncBlobStore {
@Override @Override
public ListenableFuture<org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>> list() { public ListenableFuture<org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>> list() {
return Futures return Futures
.compose( .compose(
async.listContainers(includeMetadata()), async.listContainers(includeMetadata()),
new Function<BoundedSet<ContainerProperties>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() { new Function<BoundedSet<ContainerProperties>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() {
public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply( public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(
BoundedSet<ContainerProperties> from) { BoundedSet<ContainerProperties> from) {
return new PageSetImpl<StorageMetadata>(Iterables.transform(from, container2ResourceMd), return new PageSetImpl<StorageMetadata>(Iterables.transform(from, container2ResourceMd), from
from.getNextMarker()); .getNextMarker());
} }
}, service); }, service);
} }
/** /**
@ -243,4 +243,9 @@ public class AzureAsyncBlobStore extends BaseAsyncBlobStore {
throw new UnsupportedOperationException("please use deleteContainer"); throw new UnsupportedOperationException("please use deleteContainer");
} }
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
} }