Allows users to download large files efficiently and directly to disk.

This commit is contained in:
Zack Shoylev 2016-07-14 11:06:36 -05:00
parent 4c92763663
commit 05c05e3de2
12 changed files with 656 additions and 51 deletions

View File

@ -19,6 +19,7 @@ package org.jclouds.atmos.blobstore;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.jclouds.atmos.options.PutOptions.Builder.publicRead;
import java.io.InputStream;
import java.util.List;
import java.util.Set;
@ -115,7 +116,7 @@ public class AtmosBlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link AtmosClient#createDirectory}
*
*
* @param location
* currently ignored
* @param container
@ -148,7 +149,7 @@ public class AtmosBlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link AtmosClient#createDirectory}
*
*
* @param container
* directory name
*/
@ -183,7 +184,7 @@ public class AtmosBlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link AtmosClient#pathExists}
*
*
* @param container
* container
* @param key
@ -341,6 +342,11 @@ public class AtmosBlobStore extends BaseBlobStore {
throw new UnsupportedOperationException("Atmos does not support multipart uploads");
}
@Override
public InputStream streamBlob(String container, String name) {
throw new UnsupportedOperationException("Atmos does not support multipart uploads");
}
@Override
public String copyBlob(String fromContainer, String fromName, String toContainer, String toName,
CopyOptions options) {

View File

@ -25,15 +25,26 @@ import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursi
import static org.jclouds.location.predicates.LocationPredicates.idEquals;
import static org.jclouds.openstack.swift.v1.options.PutOptions.Builder.metadata;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import javax.inject.Inject;
import javax.inject.Named;
import org.jclouds.Constants;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.KeyNotFoundException;
@ -80,7 +91,6 @@ import org.jclouds.openstack.swift.v1.options.UpdateContainerOptions;
import org.jclouds.openstack.swift.v1.reference.SwiftHeaders;
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
@ -96,10 +106,12 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.common.net.HttpHeaders;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.AbstractModule;
import com.google.inject.Injector;
import com.google.inject.assistedinject.Assisted;
@ -109,7 +121,7 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
@Inject
protected RegionScopedSwiftBlobStore(Injector baseGraph, BlobStoreContext context, SwiftApi api,
@Memoized Supplier<Set<? extends Location>> locations, @Assisted String regionId,
PayloadSlicer slicer) {
PayloadSlicer slicer, @Named(PROPERTY_USER_THREADS) ListeningExecutorService userExecutor) {
checkNotNull(regionId, "regionId");
Optional<? extends Location> found = tryFind(locations.get(), idEquals(regionId));
checkArgument(found.isPresent(), "region %s not in %s", regionId, locations.get());
@ -119,6 +131,7 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
this.toResourceMetadata = new ToResourceMetadata(found.get());
this.context = context;
this.api = api;
this.userExecutor = userExecutor;
// until we parameterize ClearListStrategy with a factory
this.clearList = baseGraph.createChildInjector(new AbstractModule() {
@Override
@ -137,6 +150,7 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
private final ToListContainerOptions toListContainerOptions = new ToListContainerOptions();
private final ToResourceMetadata toResourceMetadata;
protected final PayloadSlicer slicer;
protected final ListeningExecutorService userExecutor;
@Override
public Set<? extends Location> listAssignableLocations() {
@ -586,10 +600,9 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
throw new UnsupportedOperationException();
}
@com.google.inject.Inject
@Named(PROPERTY_USER_THREADS)
@VisibleForTesting
ListeningExecutorService userExecutor;
@com.google.inject.Inject(optional = true)
@Named(Constants.PROPERTY_MAX_RETRIES)
protected int retryCountLimit = 5;
/**
* Upload using a user-provided executor, or the jclouds userExecutor
@ -618,7 +631,7 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
getMinimumMultipartPartSize(), getMaximumMultipartPartSize(), getMaximumNumberOfParts());
long partSize = algorithm.calculateChunkSize(contentLength);
MultipartUpload mpu = initiateMultipartUpload(container, blob.getMetadata(), partSize, overrides);
int partNumber = 1;
int partNumber = 0;
for (Payload payload : slicer.slice(blob.getPayload(), partSize)) {
BlobUploader b =
@ -645,4 +658,220 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
return uploadMultipartPart(mpu, partNumber, payload);
}
}
@Override
@Beta
public void downloadBlob(String container, String name, File destination) {
downloadBlob(container, name, destination, userExecutor);
}
@Override
@Beta
public void downloadBlob(String container, String name, File destination, ExecutorService executor) {
ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
RandomAccessFile raf = null;
try {
long contentLength = api
.getObjectApi(regionId, container)
.getWithoutBody(name)
.getPayload()
.getContentMetadata()
.getContentLength();
// Reserve space for performance reasons
raf = new RandomAccessFile(destination.getAbsoluteFile(), "rw");
raf.seek(contentLength - 1);
raf.write(0);
// Determine download buffer size, smaller means less memory usage; larger is faster as long as threads are saturated
long partSize = getMinimumMultipartPartSize();
// Loop through ranges within the file
long from;
long to;
List<ListenableFuture<Void>> results = new ArrayList<ListenableFuture<Void>>();
for (from = 0; from < contentLength; from = from + partSize) {
to = (from + partSize >= contentLength) ? contentLength - 1 : from + partSize - 1;
BlobDownloader b = new BlobDownloader(regionId, container, name, raf, from, to);
results.add(listeningExecutor.submit(b));
}
Futures.getUnchecked(Futures.allAsList(results));
} catch (IOException e) {
// cleanup, attempt to delete large file
if (raf != null) {
try {
raf.close();
} catch (IOException e1) {}
}
destination.delete();
throw new RuntimeException(e);
}
}
private final class BlobDownloader implements Callable<Void> {
String regionId;
String containerName;
String objectName;
private final RandomAccessFile raf;
private final long begin;
private final long end;
BlobDownloader(String regionId, String containerName, String objectName, RandomAccessFile raf, long begin, long end) {
this.regionId = regionId;
this.containerName = containerName;
this.objectName = objectName;
this.raf = raf;
this.begin = begin;
this.end = end;
}
@Override
public Void call() {
IOException lastException = null;
for (int retry = 0; retry < retryCountLimit; retry++) {
try {
SwiftObject object = api.getObjectApi(regionId, containerName)
.get(objectName, org.jclouds.http.options.GetOptions.Builder.range(begin, end));
// Download first, this is the part that usually fails
byte[] targetArray = ByteStreams.toByteArray(object.getPayload().openStream());
// Map file region
MappedByteBuffer out = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, begin, end - begin + 1);
out.put(targetArray);
out.force();
} catch (IOException e) {
lastException = e;
continue;
}
// Success!
return null;
}
throw new RuntimeException("After " + retryCountLimit + " retries: " + lastException);
}
}
@Beta
@Override
public InputStream streamBlob(final String container, final String name) {
return streamBlob(container, name, userExecutor);
}
@Beta
@Override
public InputStream streamBlob(final String container, final String name, final ExecutorService executor) {
final ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
// User will receive the Input end of the piped stream
final PipedOutputStream output;
final PipedInputStream input;
try {
output = new PipedOutputStream();
input = new PipedInputStream(output,
getMinimumMultipartPartSize() * 5 > Integer.MAX_VALUE ?
Integer.MAX_VALUE : (int) getMinimumMultipartPartSize() * 5);
} catch (IOException e) {
throw new RuntimeException(e);
}
// The total length of the file to download is needed to determine ranges
// It has to be obtainable without downloading the whole file
final long contentLength = api
.getObjectApi(regionId, container)
.getWithoutBody(name)
.getPayload()
.getContentMetadata()
.getContentLength();
// Determine download buffer size, smaller means less memory usage; larger is faster as long as threads are saturated
final long partSize = getMinimumMultipartPartSize();
// Used to communicate between the producer and consumer threads
final LinkedBlockingQueue<ListenableFuture<byte[]>> results = new LinkedBlockingQueue<ListenableFuture<byte[]>>();
listeningExecutor.submit(new Runnable() {
@Override
public void run() {
ListenableFuture<byte[]> result;
long from;
for (from = 0; from < contentLength; from = from + partSize) {
try {
System.out.println(Thread.currentThread() + " writing to output");
result = results.take();
if (result == null) {
output.close();
input.close();
throw new RuntimeException("Error downloading file part to stream");
}
output.write(result.get());
} catch (Exception e) {
System.out.println(e);
try {
// close pipe so client is notified of an exception
input.close();
} catch (IOException e1) {}
try {
output.close();
} catch (IOException e1) {}
throw new RuntimeException(e);
}
}
// Finished writing results to stream
try {
output.close();
} catch (IOException e) {
}
}
});
listeningExecutor.submit(new Runnable() {
@Override
public void run() {
long from;
long to;
// Loop through ranges within the file
for (from = 0; from < contentLength; from = from + partSize) {
to = (from + partSize >= contentLength) ? contentLength - 1 : from + partSize - 1;
BlobStreamDownloader b = new BlobStreamDownloader(container, name, from, to);
results.add(listeningExecutor.submit(b));
}
}
});
return input;
}
private final class BlobStreamDownloader implements Callable<byte[]> {
String containerName;
String objectName;
private final long begin;
private final long end;
BlobStreamDownloader(String containerName, String objectName, long begin, long end) {
this.containerName = containerName;
this.objectName = objectName;
this.begin = begin;
this.end = end;
}
@Override
public byte[] call() {
IOException lastException = null;
for (int retry = 0; retry < retryCountLimit; retry++) {
try {
long time = System.nanoTime();
SwiftObject object = api.getObjectApi(regionId, containerName)
.get(objectName, org.jclouds.http.options.GetOptions.Builder.range(begin, end));
byte[] downloadedBlock = ByteStreams.toByteArray(object.getPayload().openStream());
return downloadedBlock;
} catch (IOException e) {
System.out.println(e);
lastException = e;
continue;
}
}
throw new RuntimeException("After " + retryCountLimit + " retries: " + lastException);
}
}
}

View File

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.openstack.swift.v1.blobstore;
import static org.assertj.core.util.Files.delete;
import static org.jclouds.blobstore.options.PutOptions.Builder.multipart;
import static org.jclouds.openstack.keystone.v2_0.config.KeystoneProperties.CREDENTIAL_TYPE;
import static org.testng.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.integration.internal.BaseBlobStoreIntegrationTest;
import org.jclouds.io.payloads.FilePayload;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
// TODO: Rolls tests up to BaseBlobStoreIntegrationTest
@Test(groups = "live", singleThreaded = true)
public class RegionScopedSwiftBlobStoreParallelLiveTest extends BaseBlobStoreIntegrationTest {
private final File BIG_FILE = new File("random.dat");
private final long SIZE = 1000000000; //10 * 1000 * 1000;
private BlobStore blobStore;
private String ETAG;
private ListeningExecutorService executor =
MoreExecutors.listeningDecorator(
MoreExecutors.getExitingExecutorService(
new ThreadPoolExecutor(5, 5,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10, true), new ThreadPoolExecutor.CallerRunsPolicy())));
private String CONTAINER = "jcloudsparalleltest" + UUID.randomUUID();
public RegionScopedSwiftBlobStoreParallelLiveTest() {
provider = "openstack-swift";
}
// Override as needed for the right region
protected BlobStore getBlobStore() {
RegionScopedBlobStoreContext ctx = RegionScopedBlobStoreContext.class.cast(view);
return ctx.getBlobStore("US-TX");
}
@Override
protected Properties setupProperties() {
Properties props = super.setupProperties();
setIfTestSystemPropertyPresent(props, CREDENTIAL_TYPE);
return props;
}
@BeforeClass
public void setup() throws IOException, InterruptedException {
blobStore = getBlobStore();
createRandomFile(SIZE, BIG_FILE);
HashCode hashCode = Files.hash(BIG_FILE, Hashing.md5());
ETAG = hashCode.toString();
blobStore.createContainerInLocation(null, CONTAINER);
System.out.println("generated file md5: " + ETAG);
}
@AfterClass
public void cleanupFiles() {
// Delete local file
delete(BIG_FILE);
// Delete uploaded file
blobStore.clearContainer(CONTAINER);
blobStore.deleteContainer(CONTAINER);
}
@Test
public void uploadMultipartBlob() {
Blob blob = blobStore.blobBuilder(BIG_FILE.getName())
.payload(new FilePayload(BIG_FILE))
.build();
// configure the blobstore to use multipart uploading of the file
String eTag = blobStore.putBlob(CONTAINER, blob, multipart(executor));
// assertEquals(eTag, ETAG);
// The etag returned by Swift is not the md5 of the Blob uploaded
// It is the md5 of the concatenated segment md5s
}
@Test(dependsOnMethods = "uploadMultipartBlob", singleThreaded = true)
public void downloadParallelBlob() throws IOException {
final File downloadedFile = new File(BIG_FILE.getName() + ".downloaded");
blobStore.downloadBlob(CONTAINER, BIG_FILE.getName(), downloadedFile, executor);
String eTag = Files.hash(downloadedFile, Hashing.md5()).toString();
assertEquals(eTag, ETAG);
}
@Test(dependsOnMethods = "uploadMultipartBlob", singleThreaded = true)
public void streamParallelBlob() throws IOException {
InputStream is = blobStore.streamBlob(CONTAINER, BIG_FILE.getName(), executor);
byte[] segment = new byte[1000000];
Hasher hasher = Hashing.md5().newHasher();
int read;
while ( (read = is.read(segment)) > 0) {
System.out.println("Read " + read + " bytes from input stream.");
hasher.putBytes(segment, 0, read);
}
is.close();
assertEquals(hasher.hash().toString(), ETAG);
}
private void createRandomFile(long size, File file) throws IOException, InterruptedException {
RandomAccessFile raf = null;
// Reserve space for performance reasons
raf = new RandomAccessFile(file.getAbsoluteFile(), "rw");
raf.seek(size - 1);
raf.write(0);
// Loop through ranges within the file
long from;
long to;
long partSize = 1000000;
ExecutorService threadPool = Executors.newFixedThreadPool(16);
for (from = 0; from < size; from = from + partSize) {
to = (from + partSize >= size) ? size - 1 : from + partSize - 1;
RandomFileWriter writer = new RandomFileWriter(raf, from, to);
threadPool.submit(writer);
}
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.DAYS);
}
private final class RandomFileWriter implements Runnable {
private final RandomAccessFile raf;
private final long begin;
private final long end;
RandomFileWriter(RandomAccessFile raf, long begin, long end) {
this.raf = raf;
this.begin = begin;
this.end = end;
}
@Override
public void run() {
try {
byte[] targetArray = new byte[(int) (end - begin + 1)];
Random random = new Random();
random.nextBytes(targetArray);
// Map file region
MappedByteBuffer out = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, begin, end - begin + 1);
out.put(targetArray);
out.force();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -20,6 +20,7 @@ import static org.jclouds.rackspace.cloudidentity.v2_0.config.CloudIdentityCrede
import java.util.Properties;
import org.jclouds.openstack.swift.v1.blobstore.RegionScopedBlobStoreContext;
import org.jclouds.openstack.swift.v1.blobstore.RegionScopedBlobStoreContextLiveTest;
import org.testng.annotations.Test;

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.rackspace.cloudfiles.v1.blobstore;
import static org.jclouds.rackspace.cloudidentity.v2_0.config.CloudIdentityCredentialTypes.API_KEY_CREDENTIALS;
import java.util.Properties;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.openstack.swift.v1.blobstore.RegionScopedBlobStoreContext;
import org.jclouds.openstack.swift.v1.blobstore.RegionScopedSwiftBlobStoreParallelLiveTest;
import org.testng.annotations.Test;
//Applies the RegionScopedSwiftBlobStoreIntegrationTest to rackspace
@Test(groups = "live")
public class CloudFilesRegionScopedSwiftBlobStoreParallelLiveTest extends RegionScopedSwiftBlobStoreParallelLiveTest {
public CloudFilesRegionScopedSwiftBlobStoreParallelLiveTest() {
provider = "rackspace-cloudfiles";
}
@Override
protected BlobStore getBlobStore() {
RegionScopedBlobStoreContext ctx = RegionScopedBlobStoreContext.class.cast(view);
return ctx.getBlobStore("IAD");
}
@Override
protected Properties setupProperties() {
Properties props = super.setupProperties();
setIfTestSystemPropertyPresent(props, API_KEY_CREDENTIALS);
return props;
}
}

View File

@ -122,7 +122,7 @@ public class S3BlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link S3Client#bucketExists}
*
*
* @param container
* bucket name
*/
@ -133,7 +133,7 @@ public class S3BlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link S3Client#putBucketInRegion}
*
*
* @param location
* corresponds to a Region
* @param container
@ -165,7 +165,7 @@ public class S3BlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link S3Client#listBucket}
*
*
* @param container
* bucket name
*/
@ -195,7 +195,7 @@ public class S3BlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link S3Client#objectExists}
*
*
* @param container
* bucket name
* @param key
@ -208,7 +208,7 @@ public class S3BlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link S3Client#headObject}
*
*
* @param container
* bucket name
* @param key
@ -221,7 +221,7 @@ public class S3BlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link S3Client#getObject}
*
*
* @param container
* bucket name
* @param key
@ -235,7 +235,7 @@ public class S3BlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link S3Client#putObject}
*
*
* @param container
* bucket name
* @param blob
@ -248,7 +248,7 @@ public class S3BlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link S3Client#putObject}
*
*
* @param container
* bucket name
* @param blob
@ -322,7 +322,7 @@ public class S3BlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link S3Client#deleteObject}
*
*
* @param container
* bucket name
* @param key

View File

@ -16,10 +16,11 @@
*/
package org.jclouds.blobstore;
import java.io.File;
import java.io.InputStream;
import java.util.List;
import java.util.Set;
import com.google.common.annotations.Beta;
import java.util.concurrent.ExecutorService;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.BlobAccess;
@ -39,6 +40,8 @@ import org.jclouds.domain.Location;
import org.jclouds.io.Payload;
import org.jclouds.javax.annotation.Nullable;
import com.google.common.annotations.Beta;
/**
* Synchronous access to a BlobStore such as Amazon S3
*/
@ -49,7 +52,7 @@ public interface BlobStore {
BlobStoreContext getContext();
/**
*
*
* @return builder for creating new {@link Blob}s
*/
BlobBuilder blobBuilder(String name);
@ -75,14 +78,14 @@ public interface BlobStore {
/**
* Creates a namespace for your blobs
* <p/>
*
*
* A container is a namespace for your objects. Depending on the service, the scope can be
* global, identity, or sub-identity scoped. For example, in Amazon S3, containers are called
* buckets, and they must be uniquely named such that no-one else in the world conflicts. In
* other blobstores, the naming convention of the container is less strict. All blobstores allow
* you to list your containers and also the contents within them. These contents can either be
* blobs, folders, or virtual paths.
*
*
* @param location
* some blobstores allow you to specify a location, such as US-EAST, for where this
* container will exist. null will choose a default location
@ -93,7 +96,7 @@ public interface BlobStore {
boolean createContainerInLocation(@Nullable Location location, String container);
/**
*
*
* @param options
* controls default access control
* @see #createContainerInLocation(Location,String)
@ -108,7 +111,7 @@ public interface BlobStore {
/**
* Lists all resources in a container non-recursive.
*
*
* @param container
* what to list
* @return a list that may be incomplete, depending on whether PageSet#getNextMarker is set
@ -118,7 +121,7 @@ public interface BlobStore {
/**
* Like {@link #list(String)} except you can control the size, recursion, and context of the list
* using {@link ListContainerOptions options}
*
*
* @param container
* what to list
* @param options
@ -129,7 +132,7 @@ public interface BlobStore {
/**
* This will delete the contents of a container at its root path without deleting the container
*
*
* @param container
* what to clear
*/
@ -138,7 +141,7 @@ public interface BlobStore {
/**
* Like {@link #clearContainer(String)} except you can use options to do things like recursive
* deletes, or clear at a different path than root.
*
*
* @param container
* what to clear
* @param options
@ -148,7 +151,7 @@ public interface BlobStore {
/**
* This will delete everything inside a container recursively.
*
*
* @param container
* what to delete
* @param container name of the container to delete
@ -165,7 +168,7 @@ public interface BlobStore {
/**
* Determines if a directory exists
*
*
* @param container
* container where the directory resides
* @param directory
@ -175,7 +178,7 @@ public interface BlobStore {
/**
* Creates a folder or a directory marker depending on the service
*
*
* @param container
* container to create the directory in
* @param directory
@ -185,7 +188,7 @@ public interface BlobStore {
/**
* Deletes a folder or a directory marker depending on the service
*
*
* @param container
* container to delete the directory from
* @param directory
@ -195,7 +198,7 @@ public interface BlobStore {
/**
* Determines if a blob exists
*
*
* @param container
* container where the blob resides
* @param directory
@ -205,7 +208,7 @@ public interface BlobStore {
/**
* Adds a {@code Blob} representing the data at location {@code container/blob.metadata.name}
*
*
* @param container
* container to place the blob.
* @param blob
@ -221,7 +224,7 @@ public interface BlobStore {
/**
* Adds a {@code Blob} representing the data at location {@code container/blob.metadata.name}
* options using multipart strategies.
*
*
* @param container
* container to place the blob.
* @param blob
@ -249,7 +252,7 @@ public interface BlobStore {
/**
* Retrieves the metadata of a {@code Blob} at location {@code container/name}
*
*
* @param container
* container where this exists.
* @param name
@ -263,7 +266,7 @@ public interface BlobStore {
/**
* Retrieves a {@code Blob} representing the data at location {@code container/name}
*
*
* @param container
* container where this exists.
* @param name
@ -277,7 +280,7 @@ public interface BlobStore {
/**
* Retrieves a {@code Blob} representing the data at location {@code container/name}
*
*
* @param container
* container where this exists.
* @param name
@ -293,7 +296,7 @@ public interface BlobStore {
/**
* Deletes a {@code Blob} representing the data at location {@code container/name}
*
*
* @param container
* container where this exists.
* @param name
@ -359,4 +362,16 @@ public interface BlobStore {
@Beta
int getMaximumNumberOfParts();
@Beta
void downloadBlob(String container, String name, File destination);
@Beta
void downloadBlob(String container, String name, File destination, ExecutorService executor);
@Beta
InputStream streamBlob(String container, String name);
@Beta
InputStream streamBlob(String container, String name, ExecutorService executor);
}

View File

@ -39,6 +39,7 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import javax.annotation.Resource;
import javax.inject.Inject;
@ -942,10 +943,32 @@ public final class LocalBlobStore implements BlobStore {
return Integer.MAX_VALUE;
}
@Override
public void downloadBlob(String container, String name, File destination) {
throw new UnsupportedOperationException();
}
@Override
public void downloadBlob(String container, String name, File destination, ExecutorService executor) {
throw new UnsupportedOperationException();
}
@Override
public InputStream streamBlob(String container, String name) {
throw new UnsupportedOperationException();
}
@Override
public InputStream streamBlob(String container, String name, ExecutorService executor) {
throw new UnsupportedOperationException();
}
private static String maybeQuoteETag(String eTag) {
if (!eTag.startsWith("\"") && !eTag.endsWith("\"")) {
eTag = "\"" + eTag + "\"";
}
return eTag;
}
}

View File

@ -22,13 +22,15 @@ import static org.jclouds.Constants.PROPERTY_USER_THREADS;
import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
import static org.jclouds.util.Predicates2.retry;
import java.io.InputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import javax.inject.Inject;
import javax.inject.Named;
@ -400,4 +402,24 @@ public abstract class BaseBlobStore implements BlobStore {
}
return eTag;
}
@Override
public void downloadBlob(String container, String name, File destination) {
throw new UnsupportedOperationException("Operation not supported yet");
}
@Override
public void downloadBlob(String container, String name, File destination, ExecutorService executor) {
throw new UnsupportedOperationException("Operation not supported yet");
}
@Override
public InputStream streamBlob(String container, String name) {
throw new UnsupportedOperationException("Operation not supported yet");
}
@Override
public InputStream streamBlob(String container, String name, ExecutorService executor) {
throw new UnsupportedOperationException("Operation not supported yet");
}
}

View File

@ -17,7 +17,10 @@
package org.jclouds.blobstore.util;
import java.io.File;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
@ -165,4 +168,24 @@ public final class ReadOnlyBlobStore extends ForwardingBlobStore {
public List<MultipartUpload> listMultipartUploads(String container) {
throw new UnsupportedOperationException("Read-only BlobStore");
}
@Override
public void downloadBlob(String container, String name, File destination) {
throw new UnsupportedOperationException();
}
@Override
public void downloadBlob(String container, String name, File destination, ExecutorService executor) {
throw new UnsupportedOperationException();
}
@Override
public InputStream streamBlob(String container, String name) {
throw new UnsupportedOperationException();
}
@Override
public InputStream streamBlob(String container, String name, ExecutorService executor) {
throw new UnsupportedOperationException();
}
}

View File

@ -19,6 +19,7 @@ package org.jclouds.azureblob.blobstore;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.jclouds.azure.storage.options.ListOptions.Builder.includeMetadata;
import java.io.InputStream;
import java.net.URI;
import java.util.EnumSet;
import java.util.List;
@ -130,7 +131,7 @@ public class AzureBlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link AzureBlobClient#bucketExists}
*
*
* @param container
* container name
*/
@ -141,7 +142,7 @@ public class AzureBlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link AzureBlobClient#putBucketInRegion}
*
*
* @param location
* currently ignored
* @param container
@ -154,7 +155,7 @@ public class AzureBlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link AzureBlobClient#listBlobs}
*
*
* @param container
* container name
*/
@ -166,7 +167,7 @@ public class AzureBlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link AzureBlobClient#deleteContainer}
*
*
* @param container
* container name
*/
@ -177,7 +178,7 @@ public class AzureBlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link AzureBlobClient#blobExists}
*
*
* @param container
* container name
* @param key
@ -190,7 +191,7 @@ public class AzureBlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link AzureBlobClient#getBlob}
*
*
* @param container
* container name
* @param key
@ -205,7 +206,7 @@ public class AzureBlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link AzureBlobClient#putObject}
*
*
* @param container
* container name
* @param blob
@ -218,7 +219,7 @@ public class AzureBlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link AzureBlobClient#putObject}
*
*
* @param container
* container name
* @param blob
@ -298,7 +299,7 @@ public class AzureBlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link AzureBlobClient#deleteObject}
*
*
* @param container
* container name
* @param key
@ -336,7 +337,7 @@ public class AzureBlobStore extends BaseBlobStore {
/**
* This implementation invokes {@link AzureBlobClient#getBlobProperties}
*
*
* @param container
* container name
* @param key
@ -498,4 +499,9 @@ public class AzureBlobStore extends BaseBlobStore {
public int getMaximumNumberOfParts() {
return 50 * 1000;
}
@Override
public InputStream streamBlob(String container, String name) {
throw new UnsupportedOperationException("Azure does not support streaming a blob");
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.rackspace.cloudfiles.us.blobstore.integration;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.openstack.swift.v1.blobstore.RegionScopedBlobStoreContext;
import org.jclouds.rackspace.cloudfiles.v1.blobstore.CloudFilesRegionScopedSwiftBlobStoreParallelLiveTest;
import org.testng.annotations.Test;
@Test(groups = "live", testName = "CloudFilesUSBlobIntegrationLiveTest")
public class CloudFilesUSBlobIntegrationParallelLiveTest extends CloudFilesRegionScopedSwiftBlobStoreParallelLiveTest {
public CloudFilesUSBlobIntegrationParallelLiveTest() {
provider = "rackspace-cloudfiles-us";
}
@Override
protected BlobStore getBlobStore() {
RegionScopedBlobStoreContext ctx = RegionScopedBlobStoreContext.class.cast(view);
return ctx.getBlobStore("DFW");
}
}