Issue 486: moved netty to a driver and created a base payload slicer

This commit is contained in:
Adrian Cole 2011-03-06 00:04:46 -05:00 committed by Tibor Kiss
parent 9bbfcad144
commit e5af84ef0e
27 changed files with 597 additions and 203 deletions

View File

@ -250,4 +250,9 @@ public class AtmosAsyncBlobStore extends BaseAsyncBlobStore {
return async.deletePath(container + "/" + key);
}
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
}

View File

@ -660,4 +660,10 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
String eTag = CryptoStreams.hex(object.getPayload().getContentMetadata().getContentMD5());
return eTag;
}
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
}

View File

@ -84,12 +84,12 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore {
@Inject
protected S3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, S3AsyncClient async, S3Client sync,
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, S3AsyncClient async, S3Client sync,
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
super(context, blobUtils, service, defaultLocation, locations);
this.blob2ObjectGetOptions = checkNotNull(blob2ObjectGetOptions, "blob2ObjectGetOptions");
this.async = checkNotNull(async, "async");
@ -109,11 +109,11 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore {
@Override
public ListenableFuture<PageSet<? extends StorageMetadata>> list() {
return Futures.compose(async.listOwnedBuckets(),
new Function<Set<BucketMetadata>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() {
public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(Set<BucketMetadata> from) {
return new PageSetImpl<StorageMetadata>(Iterables.transform(from, bucket2ResourceMd), null);
}
}, service);
new Function<Set<BucketMetadata>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() {
public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(Set<BucketMetadata> from) {
return new PageSetImpl<StorageMetadata>(Iterables.transform(from, bucket2ResourceMd), null);
}
}, service);
}
/**
@ -153,9 +153,9 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore {
ListBucketOptions httpOptions = container2BucketListOptions.apply(options);
ListenableFuture<ListBucketResponse> returnVal = async.listBucket(container, httpOptions);
ListenableFuture<PageSet<? extends StorageMetadata>> list = Futures.compose(returnVal, bucket2ResourceList,
service);
service);
return (options.isDetailed()) ? Futures.compose(list,
fetchBlobMetadataProvider.get().setContainerName(container), service) : list;
fetchBlobMetadataProvider.get().setContainerName(container), service) : list;
}
/**
@ -238,4 +238,9 @@ public class S3AsyncBlobStore extends BaseAsyncBlobStore {
return async.deleteObject(container, key);
}
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
}

View File

@ -55,12 +55,16 @@ public class S3BlobStoreContextModule extends AbstractModule {
bind(ConsistencyModel.class).toInstance(ConsistencyModel.EVENTUAL);
bind(AsyncBlobStore.class).to(S3AsyncBlobStore.class).in(Scopes.SINGLETON);
bind(BlobStore.class).to(S3BlobStore.class).in(Scopes.SINGLETON);
bind(BlobStoreContext.class).to(new TypeLiteral<BlobStoreContextImpl<S3Client, S3AsyncClient>>() {
}).in(Scopes.SINGLETON);
bindContext();
bind(BlobRequestSigner.class).to(S3BlobRequestSigner.class);
bindBucketLocationStrategy();
}
protected void bindContext() {
bind(BlobStoreContext.class).to(new TypeLiteral<BlobStoreContextImpl<S3Client, S3AsyncClient>>() {
}).in(Scopes.SINGLETON);
}
protected void bindBucketLocationStrategy() {
bind(new TypeLiteral<Function<BucketMetadata, Location>>() {
}).to(LocationFromBucketLocation.class);

View File

@ -83,13 +83,13 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
@Inject
SwiftAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, CommonSwiftClient sync, CommonSwiftAsyncClient async,
ContainerToResourceMetadata container2ResourceMd,
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, CommonSwiftClient sync, CommonSwiftAsyncClient async,
ContainerToResourceMetadata container2ResourceMd,
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
super(context, blobUtils, service, defaultLocation, locations);
this.sync = sync;
this.async = async;
@ -109,12 +109,11 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
@Override
public ListenableFuture<PageSet<? extends StorageMetadata>> list() {
return Futures.compose(async.listContainers(),
new Function<Set<ContainerMetadata>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() {
public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(
Set<ContainerMetadata> from) {
return new PageSetImpl<StorageMetadata>(Iterables.transform(from, container2ResourceMd), null);
}
}, service);
new Function<Set<ContainerMetadata>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() {
public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(Set<ContainerMetadata> from) {
return new PageSetImpl<StorageMetadata>(Iterables.transform(from, container2ResourceMd), null);
}
}, service);
}
/**
@ -145,12 +144,12 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
@Override
public ListenableFuture<PageSet<? extends StorageMetadata>> list(String container, ListContainerOptions options) {
org.jclouds.openstack.swift.options.ListContainerOptions httpOptions = container2ContainerListOptions
.apply(options);
.apply(options);
ListenableFuture<PageSet<ObjectInfo>> returnVal = async.listObjects(container, httpOptions);
ListenableFuture<PageSet<? extends StorageMetadata>> list = Futures.compose(returnVal, container2ResourceList,
service);
service);
return options.isDetailed() ? Futures.compose(list, fetchBlobMetadataProvider.get().setContainerName(container),
service) : list;
service) : list;
}
/**
@ -177,14 +176,14 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
@Override
public ListenableFuture<BlobMetadata> blobMetadata(String container, String key) {
return Futures.compose(async.getObjectInfo(container, key),
new Function<MutableObjectInfoWithMetadata, BlobMetadata>() {
new Function<MutableObjectInfoWithMetadata, BlobMetadata>() {
@Override
public BlobMetadata apply(MutableObjectInfoWithMetadata from) {
return object2BlobMd.apply(from);
}
@Override
public BlobMetadata apply(MutableObjectInfoWithMetadata from) {
return object2BlobMd.apply(from);
}
}, service);
}, service);
}
/**
@ -235,4 +234,9 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
return !sync.containerExists(container);
}
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
}

View File

@ -32,6 +32,7 @@ import org.jclouds.blobstore.options.GetOptions;
import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.domain.Location;
import com.google.common.annotations.Beta;
import com.google.common.util.concurrent.ListenableFuture;
/**
@ -126,6 +127,12 @@ public interface AsyncBlobStore {
*/
ListenableFuture<String> putBlob(String container, Blob blob);
/**
* @see BlobStore#putBlobMultipart
*/
@Beta
ListenableFuture<String> putBlobMultipart(String container, Blob blob);
/**
* @see BlobStore#blobMetadata
*/

View File

@ -32,6 +32,8 @@ import org.jclouds.blobstore.options.GetOptions;
import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.domain.Location;
import com.google.common.annotations.Beta;
/**
* Synchronous access to a BlobStore such as Amazon S3
*
@ -48,17 +50,18 @@ public interface BlobStore {
/**
* creates a new blob with the specified name.
*
* @see #blobBuilder
*/
@Deprecated
Blob newBlob(String name);
/**
*
* @return builder for creating new {@link Blob}s
*/
BlobBuilder blobBuilder(String name);
/**
* The get locations command returns all the valid locations for containers. A location has a
* scope, which is typically region or zone. A region is a general area, like eu-west, where a
@ -199,7 +202,7 @@ public interface BlobStore {
* if the container doesn't exist
*/
String putBlob(String container, Blob blob);
/**
* Adds a {@code Blob} representing the data at location {@code container/blob.metadata.name}
* using multipart strategies.
@ -214,6 +217,7 @@ public interface BlobStore {
* @throws ContainerNotFoundException
* if the container doesn't exist
*/
@Beta
String putBlobMultipart(String container, Blob blob);
/**

View File

@ -675,4 +675,10 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
return containerToLocation;
}
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
// TODO implement
return putBlob(container, blob);
}
}

View File

@ -119,11 +119,6 @@
<artifactId>jsr305</artifactId>
<version>1.3.9</version>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.4.Final</version>
</dependency>
</dependencies>
<profiles>

View File

@ -0,0 +1,47 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.io;
import org.jclouds.io.internal.BasePayloadSlicer;
import com.google.inject.ImplementedBy;
/**
*
* @author Adrian Cole
*/
@ImplementedBy(BasePayloadSlicer.class)
public interface PayloadSlicer {
/**
* Returns a {@link Payload} that returns input streams from the an underlying payload, where
* each stream starts at the given offset and is limited to the specified number of bytes.
*
* @param input
* the payload from which to get the raw streams
* @param offset
* the offset in bytes into the underlying stream where the returned streams will start
* @param length
* the maximum length of the returned streams
* @throws IllegalArgumentException
* if offset or length are negative
*/
Payload slice(Payload input, long offset, long length);
}

View File

@ -35,7 +35,6 @@ import javax.annotation.Nullable;
import org.jclouds.crypto.CryptoStreams;
import org.jclouds.io.payloads.ByteArrayPayload;
import org.jclouds.io.payloads.ChunkedFilePayload;
import org.jclouds.io.payloads.FilePayload;
import org.jclouds.io.payloads.InputStreamPayload;
import org.jclouds.io.payloads.StringPayload;
@ -84,10 +83,6 @@ public class Payloads {
public static FilePayload newFilePayload(File data) {
return new FilePayload(checkNotNull(data, "data"));
}
public static ChunkedFilePayload newChunkedFilePayload(File data, int part, long chunkOffset, long chunkSize) {
return new ChunkedFilePayload(checkNotNull(data, "data"), part, chunkOffset, chunkSize);
}
public static UrlEncodedFormPayload newUrlEncodedFormPayload(Multimap<String, String> formParams, char... skips) {
return new UrlEncodedFormPayload(formParams, skips);

View File

@ -0,0 +1,104 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.io.internal;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import javax.inject.Singleton;
import org.jclouds.io.InputSuppliers;
import org.jclouds.io.Payload;
import org.jclouds.io.PayloadSlicer;
import org.jclouds.io.payloads.BaseMutableContentMetadata;
import org.jclouds.io.payloads.InputStreamSupplierPayload;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
/**
*
* @author Adrian Cole
*/
@Singleton
public class BasePayloadSlicer implements PayloadSlicer {
/**
* {@inheritDoc}
*/
@Override
public Payload slice(Payload input, long offset, long length) {
checkNotNull(input);
checkArgument(offset >= 0, "offset is negative");
checkArgument(length >= 0, "length is negative");
Payload returnVal;
if (input.getRawContent() instanceof File) {
returnVal = doSlice((File) input.getRawContent(), offset, length);
} else if (input.getRawContent() instanceof String) {
returnVal = doSlice((byte[]) input.getRawContent(), offset, length);
} else if (input.getRawContent() instanceof byte[]) {
returnVal = doSlice((byte[]) input.getRawContent(), offset, length);
} else {
returnVal = doSlice(input.getInput(), offset, length);
}
return copyMetadataAndSetLength(input, returnVal, length);
}
protected Payload doSlice(Payload content, long offset, long length) {
return new InputStreamSupplierPayload(ByteStreams.slice(content, offset, length));
}
protected Payload doSlice(String content, long offset, long length) {
return doSlice(content.getBytes(), offset, length);
}
protected Payload doSlice(File content, long offset, long length) {
try {
return doSlice(new FileInputStream(content), offset, length);
} catch (FileNotFoundException e) {
Throwables.propagate(e);
return null;
}
}
protected Payload doSlice(InputStream content, long offset, long length) {
return new InputStreamSupplierPayload(ByteStreams.slice(InputSuppliers.of(content), offset, length));
}
protected Payload doSlice(byte[] content, long offset, long length) {
Payload returnVal;
checkArgument(offset <= Integer.MAX_VALUE, "offset is too big for an array");
checkArgument(length <= Integer.MAX_VALUE, "length is too big for an array");
returnVal = new InputStreamSupplierPayload(
ByteStreams.newInputStreamSupplier(content, (int) offset, (int) length));
return returnVal;
}
protected Payload copyMetadataAndSetLength(Payload input, Payload returnVal, long length) {
returnVal.setContentMetadata(BaseMutableContentMetadata.fromContentMetadata(input.getContentMetadata()
.toBuilder().contentLength(length).contentMD5(null).build()));
return returnVal;
}
}

View File

@ -0,0 +1,75 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.io.payloads;
import static com.google.common.io.Closeables.closeQuietly;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.io.InputSupplier;
/**
* @author Adrian Cole
*/
public class InputStreamSupplierPayload extends BasePayload<InputSupplier<? extends InputStream>> {
private List<InputStream> toClose = Lists.newArrayList();
public InputStreamSupplierPayload(InputSupplier<? extends InputStream> content) {
super(content);
}
/**
* {@inheritDoc}
*/
@Override
public InputStream getInput() {
try {
InputStream returnVal = content.getInput();
toClose.add(returnVal);
return returnVal;
} catch (IOException e) {
Throwables.propagate(e);
return null;
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean isRepeatable() {
return true;
}
/**
* if we created the stream, then it is already consumed on close.
*/
@Override
public void release() {
if (toClose.size() > 0)
for (InputStream content = toClose.remove(0); toClose.size() > 0; content = toClose.remove(0))
closeQuietly(content);
}
}

BIN
drivers/netty/.pom.xml.swp Normal file

Binary file not shown.

82
drivers/netty/pom.xml Normal file
View File

@ -0,0 +1,82 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
====================================================================
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
====================================================================
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.jclouds</groupId>
<artifactId>jclouds-project</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../project/pom.xml</relativePath>
</parent>
<groupId>org.jclouds.driver</groupId>
<artifactId>jclouds-netty</artifactId>
<name>jclouds netty payload module</name>
<description>jclouds netty payload module</description>
<!-- bootstrapping: need to fetch the project POM -->
<repositories>
<repository>
<id>jclouds-sona-snapshots-nexus</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>jboss-public-releases</id>
<url>https://repository.jboss.org/nexus/content/groups/public-jboss</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.jclouds</groupId>
<artifactId>jclouds-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.jclouds</groupId>
<artifactId>jclouds-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.4.Final</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,20 @@
package org.jclouds.netty.config;
import org.jclouds.io.PayloadSlicer;
import org.jclouds.netty.io.NettyPayloadSlicer;
import com.google.inject.AbstractModule;
/**
*
* @author Adrian Cole
*
*/
public class NettyPayloadModule extends AbstractModule {
@Override
protected void configure() {
bind(PayloadSlicer.class).to(NettyPayloadSlicer.class);
}
}

View File

@ -17,10 +17,9 @@
* ====================================================================
*/
package org.jclouds.io.payloads;
package org.jclouds.netty.io;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
@ -30,24 +29,24 @@ import org.jboss.netty.handler.stream.ChunkedFile;
/**
*
*
*
*
*
*
*
* @author Tibor Kiss
*/
public class ChunkedFileInputStream extends InputStream {
private static final int CHUNK_SIZE = 8192;
private ChunkedFile chunks;
private ChannelBuffer chunk;
private IOException ex;
public ChunkedFileInputStream(String filename, long offset, long length) {
this(new File(filename), offset, length);
}
public ChunkedFileInputStream(File file, long offset, long length) {
try {
this.chunks = new ChunkedFile(new RandomAccessFile(file, "r"), offset, length, CHUNK_SIZE);
@ -62,9 +61,9 @@ public class ChunkedFileInputStream extends InputStream {
}
if (chunk == null) {
chunk = ChannelBuffer.class.cast(chunks.nextChunk());
}
}
if (chunk != null) {
if (chunk.readableBytes() < 1 && chunks.hasNextChunk()) {
if (chunk.readableBytes() < 1 && chunks.hasNextChunk()) {
chunk = ChannelBuffer.class.cast(chunks.nextChunk());
if (chunk.readableBytes() < 1) {
return null;
@ -75,7 +74,7 @@ public class ChunkedFileInputStream extends InputStream {
}
return chunk;
}
@Override
public int read() throws IOException {
try {
@ -87,15 +86,15 @@ public class ChunkedFileInputStream extends InputStream {
int readIndex = chunk.readerIndex();
byte abyte = chunk.getByte(readIndex);
chunk.readerIndex(readIndex + 1);
return (int)abyte;
return (int) abyte;
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
try {
try {
ChannelBuffer chunk = getChunk();
if (chunk == null)
return -1;
@ -111,7 +110,7 @@ public class ChunkedFileInputStream extends InputStream {
return readable;
} catch (Exception e) {
throw new IOException(e);
}
}
}
@Override

View File

@ -17,34 +17,26 @@
* ====================================================================
*/
package org.jclouds.io.payloads;
package org.jclouds.netty.io;
import java.io.File;
import java.io.InputStream;
public class ChunkedFilePayload extends FilePayload {
import javax.inject.Singleton;
private int part;
private long chunkOffset;
private long chunkSize;
public ChunkedFilePayload(File content) {
this(content, 1, 0, content.length());
}
public ChunkedFilePayload(File content, int part, long chunkOffset, long chunkSize) {
super(content);
this.part = part;
this.chunkOffset = chunkOffset;
this.chunkSize = chunkSize;
}
public int getPart() {
return part;
}
import org.jclouds.io.Payload;
import org.jclouds.io.Payloads;
import org.jclouds.io.internal.BasePayloadSlicer;
/**
*
* @author Adrian Cole
*/
@Singleton
public class NettyPayloadSlicer extends BasePayloadSlicer {
@Override
public InputStream getInput() {
return new ChunkedFileInputStream(getRawContent(), chunkOffset, chunkSize);
protected Payload doSlice(File content, long offset, long length) {
return Payloads.newInputStreamPayload(new ChunkedFileInputStream(content, offset, length));
}
}
}

View File

@ -38,6 +38,7 @@
<module>bouncycastle</module>
<module>log4j</module>
<module>jsch</module>
<module>netty</module>
<module>enterprise</module>
</modules>
</project>

View File

@ -42,6 +42,21 @@
<test.aws-s3.credential>${test.aws.credential}</test.aws-s3.credential>
</properties>
<!-- temporary -->
<repositories>
<repository>
<id>jboss-public-releases</id>
<url>https://repository.jboss.org/nexus/content/groups/public-jboss</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.jclouds.api</groupId>
@ -94,6 +109,18 @@
<version>1.2.16</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jclouds.driver</groupId>
<artifactId>jclouds-netty</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.4.Final</version>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>

View File

@ -25,7 +25,6 @@ import java.util.Properties;
import org.jclouds.aws.s3.blobstore.config.AWSS3BlobStoreContextModule;
import org.jclouds.aws.s3.config.AWSS3RestClientModule;
import org.jclouds.s3.S3ContextBuilder;
import org.jclouds.s3.blobstore.config.S3BlobStoreContextModule;
import com.google.inject.Module;

View File

@ -29,7 +29,9 @@ import javax.inject.Provider;
import org.jclouds.Constants;
import org.jclouds.aws.s3.AWSS3AsyncClient;
import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata;
import org.jclouds.blobstore.util.BlobUtils;
@ -44,13 +46,17 @@ import org.jclouds.s3.blobstore.functions.ObjectToBlob;
import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
/**
*
*
* @author Tibor Kiss
*/
public class AWSS3AsyncBlobStore extends S3AsyncBlobStore {
private final Provider<MultipartUploadStrategy> multipartUploadStrategy;
@Inject
public AWSS3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
@ -58,10 +64,19 @@ public class AWSS3AsyncBlobStore extends S3AsyncBlobStore {
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
super(context, blobUtils, service, defaultLocation, locations, async, sync, bucket2ResourceMd,
container2BucketListOptions, bucket2ResourceList, object2Blob, blob2ObjectGetOptions,
blob2Object, object2BlobMd, fetchBlobMetadataProvider);
Provider<FetchBlobMetadata> fetchBlobMetadataProvider,
Provider<MultipartUploadStrategy> multipartUploadStrategy) {
super(context, blobUtils, service, defaultLocation, locations, async, sync, bucket2ResourceMd,
container2BucketListOptions, bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object,
object2BlobMd, fetchBlobMetadataProvider);
this.multipartUploadStrategy = multipartUploadStrategy;
}
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
// TODO: make this better
// need to use a provider if the strategy object is stateful
return Futures.immediateFuture(multipartUploadStrategy.get().execute(container, blob));
}
}

View File

@ -26,7 +26,6 @@ import javax.inject.Provider;
import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
@ -45,32 +44,30 @@ import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata;
import com.google.common.base.Supplier;
/**
* Proived AWS S3 specific extensions.
*
* Proived AWS S3 specific extensions.
*
* @author Tibor Kiss
*/
public class AWSS3BlobStore extends S3BlobStore {
private MultipartUploadStrategy multipartUploadStrategy;
private final Provider<MultipartUploadStrategy> multipartUploadStrategy;
@Inject
AWSS3BlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, AWSS3Client sync,
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
@Memoized Supplier<Set<? extends Location>> locations, AWSS3Client sync,
BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
Provider<FetchBlobMetadata> fetchBlobMetadataProvider, Provider<MultipartUploadStrategy> multipartUploadStrategy) {
super(context, blobUtils, defaultLocation, locations, sync, bucket2ResourceMd, container2BucketListOptions,
bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object, object2BlobMd,
fetchBlobMetadataProvider);
multipartUploadStrategy = new SequentialMultipartUploadStrategy(this);
this.multipartUploadStrategy = multipartUploadStrategy;
}
/* (non-Javadoc)
* @see org.jclouds.blobstore.internal.BaseBlobStore#putBlobMultipart(java.lang.String, org.jclouds.blobstore.domain.Blob)
*/
@Override
public String putBlobMultipart(String container, Blob blob) {
return multipartUploadStrategy.execute(container, blob);
// need to use a provider if the strategy object is stateful
return multipartUploadStrategy.get().execute(container, blob);
}
}

View File

@ -23,15 +23,12 @@ import org.jclouds.aws.s3.AWSS3AsyncClient;
import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.AWSS3AsyncBlobStore;
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
import org.jclouds.blobstore.AsyncBlobStore;
import org.jclouds.blobstore.BlobRequestSigner;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.attr.ConsistencyModel;
import org.jclouds.blobstore.config.BlobStoreMapModule;
import org.jclouds.blobstore.internal.BlobStoreContextImpl;
import org.jclouds.location.config.RegionsLocationModule;
import org.jclouds.s3.blobstore.S3BlobRequestSigner;
import org.jclouds.s3.blobstore.S3AsyncBlobStore;
import org.jclouds.s3.blobstore.S3BlobStore;
import org.jclouds.s3.blobstore.config.S3BlobStoreContextModule;
import com.google.inject.Scopes;
@ -39,21 +36,23 @@ import com.google.inject.TypeLiteral;
/**
*
*
*
* @author Tibor Kiss
*/
public class AWSS3BlobStoreContextModule extends S3BlobStoreContextModule {
@Override
protected void configure() {
install(new BlobStoreMapModule());
install(new RegionsLocationModule());
bind(ConsistencyModel.class).toInstance(ConsistencyModel.EVENTUAL);
bind(AsyncBlobStore.class).to(AWSS3AsyncBlobStore.class).in(Scopes.SINGLETON);
bind(BlobStore.class).to(AWSS3BlobStore.class).in(Scopes.SINGLETON);
super.configure();
bind(S3AsyncBlobStore.class).to(AWSS3AsyncBlobStore.class).in(Scopes.SINGLETON);
bind(S3BlobStore.class).to(AWSS3BlobStore.class).in(Scopes.SINGLETON);
bind(MultipartUploadStrategy.class).to(SequentialMultipartUploadStrategy.class);
}
@Override
protected void bindContext() {
bind(BlobStoreContext.class).to(new TypeLiteral<BlobStoreContextImpl<AWSS3Client, AWSS3AsyncClient>>() {
}).in(Scopes.SINGLETON);
bind(BlobRequestSigner.class).to(S3BlobRequestSigner.class);
bindBucketLocationStrategy();
}
}

View File

@ -19,19 +19,23 @@
package org.jclouds.aws.s3.blobstore.strategy.internal;
import static org.jclouds.io.Payloads.newChunkedFilePayload;
import java.io.File;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Map;
import javax.annotation.Resource;
import javax.inject.Inject;
import javax.inject.Named;
import org.jclouds.aws.s3.AWSS3Client;
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.blobstore.KeyNotFoundException;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.reference.BlobStoreConstants;
import org.jclouds.io.Payload;
import org.jclouds.io.payloads.FilePayload;
import org.jclouds.io.PayloadSlicer;
import org.jclouds.logging.Logger;
import org.jclouds.s3.domain.ObjectMetadataBuilder;
import com.google.common.collect.Maps;
@ -40,35 +44,39 @@ import com.google.common.collect.Maps;
* Provides a sequential multipart upload strategy.
*
* The file partitioning algorithm:
*
* The default partition size we choose is 32mb.
* A multiple of this default partition size is used.
* The number of parts first grows to a chosen magnitude (for example 100 parts),
* then it grows the partition size instead of number of partitions.
* When we reached the maximum part size, then again it starts to grow the number of partitions.
*
*
* The default partition size we choose is 32mb. A multiple of this default partition size is used.
* The number of parts first grows to a chosen magnitude (for example 100 parts), then it grows the
* partition size instead of number of partitions. When we reached the maximum part size, then again
* it starts to grow the number of partitions.
*
* @author Tibor Kiss
*/
public class SequentialMultipartUploadStrategy implements
MultipartUploadStrategy {
public class SequentialMultipartUploadStrategy implements MultipartUploadStrategy {
@Resource
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
protected Logger logger = Logger.NULL;
private final long DEFAULT_PART_SIZE = 33554432; // 32mb
private final long DEFAULT_PART_SIZE = 33554432; // 32mb
private final int MAGNITUDE_BASE = 100;
private final AWSS3BlobStore ablobstore;
private final PayloadSlicer slicer;
// calculated only once, but not from the constructor
private volatile long parts; // required number of parts with chunkSize
private volatile long chunkSize;
private volatile long remaining; // number of bytes remained for the last part
// sequentially updated values
private volatile int part;
private volatile long chunkOffset;
private volatile long copied;
public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore) {
this.ablobstore = ablobstore;
@Inject
public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore, PayloadSlicer slicer) {
this.ablobstore = checkNotNull(ablobstore, "ablobstore");
this.slicer = checkNotNull(slicer, "slicer");
}
protected long calculateChunkSize(long length) {
@ -100,43 +108,39 @@ public class SequentialMultipartUploadStrategy implements
this.chunkSize = partSize;
this.parts = parts;
this.remaining = length - partSize * parts;
System.out.println(" " + length + " bytes partitioned in " + parts
+ " parts of part size: " + chunkSize + ", remaining: "
+ remaining + (remaining > MAX_PART_SIZE ? " overflow!" : ""));
logger.debug(" %d bytes partitioned in %d parts of part size: %d, remaining: %d%s", length, parts, chunkSize,
remaining, (remaining > MAX_PART_SIZE ? " overflow!" : ""));
return this.chunkSize;
}
protected long getParts() {
return parts;
}
protected int getNextPart() {
return ++part;
}
protected void addCopied(long copied) {
this.copied += copied;
}
protected long getNextChunkOffset() {
long next = chunkOffset;
chunkOffset += getChunkSize();
return next;
}
protected long getChunkSize() {
return chunkSize;
}
protected long getRemaining() {
return remaining;
}
private String prepareUploadPart(AWSS3Client client, String container, String key, String uploadId,
int part, File file, long chunkOffset, long chunkSize) {
Payload chunkedPart = newChunkedFilePayload(file, part, chunkOffset, chunkSize);
chunkedPart.getContentMetadata().setContentLength(chunkSize);
//chukedPayload.getContentMetadata().setContentMD5(???);
private String prepareUploadPart(AWSS3Client client, String container, String key, String uploadId, int part,
Payload chunkedPart) {
String eTag = null;
try {
eTag = client.uploadPart(container, key, part, uploadId, chunkedPart);
@ -153,31 +157,27 @@ public class SequentialMultipartUploadStrategy implements
@Override
public String execute(String container, Blob blob) {
Payload payload = blob.getPayload();
if (payload instanceof FilePayload) {
String key = blob.getMetadata().getName();
File file = FilePayload.class.cast(payload).getRawContent();
calculateChunkSize(file.length());
long parts = getParts();
if (parts > 0) {
AWSS3Client client = (AWSS3Client) ablobstore.getContext().getProviderSpecificContext().getApi();
String uploadId = client.initiateMultipartUpload(container,
ObjectMetadataBuilder.create().key(key).build()); // TODO md5
Map<Integer, String> etags = Maps.newHashMap();
int part;
while ((part = getNextPart()) <= getParts()) {
String eTag = prepareUploadPart(client, container, key, uploadId, part, file, getNextChunkOffset(), chunkSize);
etags.put(new Integer(part), eTag);
}
long remaining = getRemaining();
if (remaining > 0) {
String eTag = prepareUploadPart(client, container, key, uploadId, part, file, getNextChunkOffset(), remaining);
etags.put(new Integer(part), eTag);
}
return client.completeMultipartUpload(container, key, uploadId, etags);
} else {
return ablobstore.putBlob(container, blob);
String key = blob.getMetadata().getName();
calculateChunkSize(blob.getPayload().getContentMetadata().getContentLength());
long parts = getParts();
if (parts > 0) {
AWSS3Client client = (AWSS3Client) ablobstore.getContext().getProviderSpecificContext().getApi();
String uploadId = client.initiateMultipartUpload(container, ObjectMetadataBuilder.create().key(key).build()); // TODO
// md5
Map<Integer, String> etags = Maps.newHashMap();
int part;
while ((part = getNextPart()) <= getParts()) {
String eTag = prepareUploadPart(client, container, key, uploadId, part,
slicer.slice(blob.getPayload(), getNextChunkOffset(), chunkSize));
etags.put(new Integer(part), eTag);
}
long remaining = getRemaining();
if (remaining > 0) {
String eTag = prepareUploadPart(client, container, key, uploadId, part,
slicer.slice(blob.getPayload(), getNextChunkOffset(), remaining));
etags.put(new Integer(part), eTag);
}
return client.completeMultipartUpload(container, key, uploadId, etags);
} else {
return ablobstore.putBlob(container, blob);
}

View File

@ -26,6 +26,7 @@ import org.jclouds.blobstore.BlobStoreContextFactory;
import org.jclouds.blobstore.integration.TransientBlobStoreTestInitializer;
import org.jclouds.blobstore.integration.internal.BaseBlobStoreIntegrationTest;
import org.jclouds.logging.log4j.config.Log4JLoggingModule;
import org.jclouds.netty.config.NettyPayloadModule;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Module;
@ -45,7 +46,7 @@ public class AWSS3TestInitializer extends TransientBlobStoreTestInitializer {
protected BlobStoreContext createLiveContext(Module configurationModule, String endpoint, String apiversion,
String app, String identity, String credential) throws IOException {
return new BlobStoreContextFactory().createContext(provider, ImmutableSet.of(configurationModule,
new Log4JLoggingModule()), setupProperties(endpoint, apiversion, identity, credential));
new Log4JLoggingModule(), new NettyPayloadModule()), setupProperties(endpoint, apiversion, identity, credential));
}
}

View File

@ -30,6 +30,7 @@ import javax.inject.Named;
import javax.inject.Singleton;
import org.jclouds.Constants;
import org.jclouds.azure.storage.domain.BoundedSet;
import org.jclouds.azureblob.AzureBlobAsyncClient;
import org.jclouds.azureblob.blobstore.functions.AzureBlobToBlob;
import org.jclouds.azureblob.blobstore.functions.BlobPropertiesToBlobMetadata;
@ -42,7 +43,6 @@ import org.jclouds.azureblob.domain.BlobProperties;
import org.jclouds.azureblob.domain.ContainerProperties;
import org.jclouds.azureblob.domain.ListBlobsResponse;
import org.jclouds.azureblob.options.ListBlobsOptions;
import org.jclouds.azure.storage.domain.BoundedSet;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.BlobMetadata;
@ -79,18 +79,18 @@ public class AzureAsyncBlobStore extends BaseAsyncBlobStore {
@Inject
AzureAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, AzureBlobAsyncClient async,
ContainerToResourceMetadata container2ResourceMd,
ListOptionsToListBlobsOptions blobStore2AzureContainerListOptions,
ListBlobsResponseToResourceList azure2BlobStoreResourceList, AzureBlobToBlob azureBlob2Blob,
BlobToAzureBlob blob2AzureBlob, BlobPropertiesToBlobMetadata blob2BlobMd,
BlobToHttpGetOptions blob2ObjectGetOptions) {
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, AzureBlobAsyncClient async,
ContainerToResourceMetadata container2ResourceMd,
ListOptionsToListBlobsOptions blobStore2AzureContainerListOptions,
ListBlobsResponseToResourceList azure2BlobStoreResourceList, AzureBlobToBlob azureBlob2Blob,
BlobToAzureBlob blob2AzureBlob, BlobPropertiesToBlobMetadata blob2BlobMd,
BlobToHttpGetOptions blob2ObjectGetOptions) {
super(context, blobUtils, service, defaultLocation, locations);
this.async = checkNotNull(async, "async");
this.container2ResourceMd = checkNotNull(container2ResourceMd, "container2ResourceMd");
this.blobStore2AzureContainerListOptions = checkNotNull(blobStore2AzureContainerListOptions,
"blobStore2AzureContainerListOptions");
"blobStore2AzureContainerListOptions");
this.azure2BlobStoreResourceList = checkNotNull(azure2BlobStoreResourceList, "azure2BlobStoreResourceList");
this.azureBlob2Blob = checkNotNull(azureBlob2Blob, "azureBlob2Blob");
this.blob2AzureBlob = checkNotNull(blob2AzureBlob, "blob2AzureBlob");
@ -105,15 +105,15 @@ public class AzureAsyncBlobStore extends BaseAsyncBlobStore {
@Override
public ListenableFuture<org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>> list() {
return Futures
.compose(
async.listContainers(includeMetadata()),
new Function<BoundedSet<ContainerProperties>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() {
public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(
BoundedSet<ContainerProperties> from) {
return new PageSetImpl<StorageMetadata>(Iterables.transform(from, container2ResourceMd),
from.getNextMarker());
}
}, service);
.compose(
async.listContainers(includeMetadata()),
new Function<BoundedSet<ContainerProperties>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() {
public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(
BoundedSet<ContainerProperties> from) {
return new PageSetImpl<StorageMetadata>(Iterables.transform(from, container2ResourceMd), from
.getNextMarker());
}
}, service);
}
/**
@ -243,4 +243,9 @@ public class AzureAsyncBlobStore extends BaseAsyncBlobStore {
throw new UnsupportedOperationException("please use deleteContainer");
}
@Override
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
return putBlob(container, blob);
}
}