Reduce filesystem and transient differences

These providers have a similar lineage but many gratuitous
differences.  This commit reduces the diff between them and is a
prerequisite for upcoming changes to make them more similar to each
other and real providers.  Some future commit might unify these in
some smarter way, e.g., having a TransientStrategy to match
FilesystemStrategy.
This commit is contained in:
Andrew Gaul 2012-05-07 14:16:03 -07:00 committed by Andrew Gaul
parent 596cf4e044
commit 3a0c15b345
2 changed files with 228 additions and 212 deletions

View File

@ -26,8 +26,6 @@ import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.find; import static com.google.common.collect.Iterables.find;
import static com.google.common.collect.Iterables.size; import static com.google.common.collect.Iterables.size;
import static com.google.common.collect.Iterables.transform; import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Lists.partition;
import static com.google.common.collect.Maps.newHashMap; import static com.google.common.collect.Maps.newHashMap;
import static com.google.common.collect.Sets.filter; import static com.google.common.collect.Sets.filter;
import static com.google.common.collect.Sets.newTreeSet; import static com.google.common.collect.Sets.newTreeSet;
@ -48,10 +46,10 @@ import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -126,10 +124,14 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
protected final FilesystemStorageStrategy storageStrategy; protected final FilesystemStorageStrategy storageStrategy;
@Inject @Inject
protected FilesystemAsyncBlobStore(BlobStoreContext context, DateService dateService, Crypto crypto, protected FilesystemAsyncBlobStore(BlobStoreContext context,
HttpGetOptionsListToGetOptions httpGetOptionsConverter, IfDirectoryReturnNameStrategy ifDirectoryReturnName, DateService dateService, Crypto crypto,
BlobUtils blobUtils, @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, HttpGetOptionsListToGetOptions httpGetOptionsConverter,
Supplier<Location> defaultLocation, @Memoized Supplier<Set<? extends Location>> locations, IfDirectoryReturnNameStrategy ifDirectoryReturnName,
BlobUtils blobUtils,
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service,
Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations,
FilesystemStorageStrategy storageStrategy) { FilesystemStorageStrategy storageStrategy) {
super(context, blobUtils, service, defaultLocation, locations); super(context, blobUtils, service, defaultLocation, locations);
this.dateService = dateService; this.dateService = dateService;
@ -146,9 +148,8 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
public ListenableFuture<PageSet<? extends StorageMetadata>> list(final String container, ListContainerOptions options) { public ListenableFuture<PageSet<? extends StorageMetadata>> list(final String container, ListContainerOptions options) {
// Check if the container exists // Check if the container exists
if (!containerExistsSyncImpl(container)) { if (!containerExistsSyncImpl(container))
return immediateFailedFuture(cnfe(container)); return immediateFailedFuture(cnfe(container));
}
// Loading blobs from container // Loading blobs from container
Iterable<String> blobBelongingToContainer = null; Iterable<String> blobBelongingToContainer = null;
@ -183,11 +184,10 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
final String finalMarker = options.getMarker(); final String finalMarker = options.getMarker();
StorageMetadata lastMarkerMetadata = find(contents, new Predicate<StorageMetadata>() { StorageMetadata lastMarkerMetadata = find(contents, new Predicate<StorageMetadata>() {
public boolean apply(StorageMetadata metadata) { public boolean apply(StorageMetadata metadata) {
return metadata.getName().equals(finalMarker); return metadata.getName().compareTo(finalMarker) > 0;
} }
}); });
contents = contents.tailSet(lastMarkerMetadata); contents = contents.tailSet(lastMarkerMetadata);
contents.remove(lastMarkerMetadata);
} }
final String prefix = options.getDir(); final String prefix = options.getDir();
@ -199,30 +199,26 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
})); }));
} }
Integer maxResults = options.getMaxResults() != null ? options.getMaxResults() : 1000; int maxResults = options.getMaxResults() != null ? options.getMaxResults() : 1000;
if (contents.size() > 0) { if (!contents.isEmpty()) {
SortedSet<StorageMetadata> contentsSlice = firstSliceOfSize(contents, maxResults); StorageMetadata lastElement = contents.last();
if (!contentsSlice.contains(contents.last())) { contents = newTreeSet(Iterables.limit(contents, maxResults));
if (!contents.contains(lastElement)) {
// Partial listing // Partial listing
marker = contentsSlice.last().getName(); marker = contents.last().getName();
} else {
marker = null;
} }
contents = contentsSlice;
} }
final String delimiter = options.isRecursive() ? null : File.separator; final String delimiter = options.isRecursive() ? null : File.separator;
if (delimiter != null) { if (delimiter != null) {
SortedSet<String> commonPrefixes = null; SortedSet<String> commonPrefixes = newTreeSet(
Iterable<String> iterable = transform(contents, new CommonPrefixes(prefix != null ? prefix : null, transform(contents, new CommonPrefixes(prefix, delimiter)));
delimiter));
commonPrefixes = iterable != null ? newTreeSet(iterable) : new TreeSet<String>();
commonPrefixes.remove(CommonPrefixes.NO_PREFIX); commonPrefixes.remove(CommonPrefixes.NO_PREFIX);
contents = newTreeSet(filter(contents, new DelimiterFilter(prefix != null ? prefix : null, delimiter))); contents = newTreeSet(filter(contents, new DelimiterFilter(prefix, delimiter)));
Iterables.<StorageMetadata> addAll(contents, Iterables.<StorageMetadata> addAll(contents, transform(commonPrefixes,
transform(commonPrefixes, new Function<String, StorageMetadata>() { new Function<String, StorageMetadata>() {
public StorageMetadata apply(String o) { public StorageMetadata apply(String o) {
MutableStorageMetadata md = new MutableStorageMetadataImpl(); MutableStorageMetadata md = new MutableStorageMetadataImpl();
md.setType(StorageType.RELATIVE_PATH); md.setType(StorageType.RELATIVE_PATH);
@ -245,7 +241,7 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
} }
private ContainerNotFoundException cnfe(String name) { private ContainerNotFoundException cnfe(final String name) {
return new ContainerNotFoundException(name, String.format("container %s not in filesystem", name)); return new ContainerNotFoundException(name, String.format("container %s not in filesystem", name));
} }
@ -284,18 +280,31 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public ListenableFuture<Void> removeBlob(String container, String key) { public ListenableFuture<Void> removeBlob(final String container, final String key) {
storageStrategy.removeBlob(container, key); storageStrategy.removeBlob(container, key);
return immediateFuture(null); return immediateFuture(null);
} }
/**
* Override parent method because it uses strange futures and listenables
* that creates problem in the test if more than one test that deletes the
* container is executed
*
* @param container
* @return
*/
@Override
public ListenableFuture<Void> deleteContainer(final String container) {
deleteAndVerifyContainerGone(container);
return immediateFuture(null);
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public ListenableFuture<Boolean> containerExists(String containerName) { public ListenableFuture<Boolean> containerExists(final String containerName) {
boolean exists = containerExistsSyncImpl(containerName); return immediateFuture(containerExistsSyncImpl(containerName));
return immediateFuture(exists);
} }
/** /**
@ -419,11 +428,6 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
} }
} }
public static <T extends Comparable<?>> SortedSet<T> firstSliceOfSize(Iterable<T> elements, int size) {
List<List<T>> slices = partition(newArrayList(elements), size);
return newTreeSet(slices.get(0));
}
public static HttpResponseException returnResponseException(int code) { public static HttpResponseException returnResponseException(int code) {
HttpResponse response = null; HttpResponse response = null;
response = new HttpResponse(code, null, null); response = new HttpResponse(code, null, null);
@ -474,18 +478,20 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public ListenableFuture<String> putBlob(String containerName, Blob object) { public ListenableFuture<String> putBlob(String containerName, Blob blob) {
String blobKey = object.getMetadata().getName(); checkNotNull(containerName, "containerName must be set");
checkNotNull(blob, "blob must be set");
String blobKey = blob.getMetadata().getName();
logger.debug("Put object with key [%s] to container [%s]", blobKey, containerName); logger.debug("Put blob with key [%s] to container [%s]", blobKey, containerName);
String eTag = getEtag(object); String eTag = getEtag(blob);
try { try {
// TODO // TODO
// must override existing file? // must override existing file?
storageStrategy.writePayloadOnFile(containerName, blobKey, object.getPayload()); storageStrategy.writePayloadOnFile(containerName, blobKey, blob.getPayload());
} catch (IOException e) { } catch (IOException e) {
logger.error(e, "An error occurred storing the new object with name [%s] to container [%s].", blobKey, logger.error(e, "An error occurred storing the new blob with name [%s] to container [%s].", blobKey,
containerName); containerName);
Throwables.propagate(e); Throwables.propagate(e);
} }
@ -584,10 +590,10 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public ListenableFuture<BlobMetadata> blobMetadata(String container, String key) { public ListenableFuture<BlobMetadata> blobMetadata(final String container, final String key) {
try { try {
Blob blob = getBlob(container, key).get(); Blob blob = getBlob(container, key).get();
return Futures.<BlobMetadata> immediateFuture(blob != null ? blob.getMetadata() : null); return immediateFuture(blob != null ? (BlobMetadata) blob.getMetadata() : null);
} catch (Exception e) { } catch (Exception e) {
if (size(filter(getCausalChain(e), KeyNotFoundException.class)) >= 1) if (size(filter(getCausalChain(e), KeyNotFoundException.class)) >= 1)
return immediateFuture(null); return immediateFuture(null);
@ -595,26 +601,6 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
} }
} }
@Override
protected boolean deleteAndVerifyContainerGone(String container) {
storageStrategy.deleteContainer(container);
return containerExistsSyncImpl(container);
}
/**
* Override parent method because it uses strange futures and listenables
* that creates problem in the test if more than one test that deletes the
* container is executed
*
* @param container
* @return
*/
@Override
public ListenableFuture<Void> deleteContainer(String container) {
deleteAndVerifyContainerGone(container);
return immediateFuture(null);
}
/** /**
* Each container is a directory, so in order to check if a container exists * Each container is a directory, so in order to check if a container exists
* the corresponding directory must exists. Synchronous implementation * the corresponding directory must exists. Synchronous implementation
@ -627,7 +613,6 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
} }
/** /**
*
* Calculates the object MD5 and returns it as eTag * Calculates the object MD5 and returns it as eTag
* *
* @param object * @param object
@ -645,6 +630,12 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
return eTag; return eTag;
} }
@Override
protected boolean deleteAndVerifyContainerGone(final String container) {
storageStrategy.deleteContainer(container);
return containerExistsSyncImpl(container);
}
@Override @Override
public ListenableFuture<String> putBlob(String container, Blob blob, PutOptions options) { public ListenableFuture<String> putBlob(String container, Blob blob, PutOptions options) {
// TODO implement options // TODO implement options

View File

@ -18,7 +18,6 @@
*/ */
package org.jclouds.blobstore; package org.jclouds.blobstore;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.getCausalChain; import static com.google.common.base.Throwables.getCausalChain;
@ -55,6 +54,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import javax.annotation.Resource;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Named; import javax.inject.Named;
import javax.inject.Provider; import javax.inject.Provider;
@ -98,6 +98,7 @@ import org.jclouds.io.Payloads;
import org.jclouds.io.payloads.ByteArrayPayload; import org.jclouds.io.payloads.ByteArrayPayload;
import org.jclouds.io.payloads.DelegatingPayload; import org.jclouds.io.payloads.DelegatingPayload;
import org.jclouds.javax.annotation.Nullable; import org.jclouds.javax.annotation.Nullable;
import org.jclouds.logging.Logger;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
@ -116,6 +117,9 @@ import com.google.common.util.concurrent.ListenableFuture;
*/ */
public class TransientAsyncBlobStore extends BaseAsyncBlobStore { public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
@Resource
protected Logger logger = Logger.NULL;
protected final DateService dateService; protected final DateService dateService;
protected final Crypto crypto; protected final Crypto crypto;
protected final ConcurrentMap<String, ConcurrentMap<String, Blob>> containerToBlobs; protected final ConcurrentMap<String, ConcurrentMap<String, Blob>> containerToBlobs;
@ -126,13 +130,17 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
protected final Factory blobFactory; protected final Factory blobFactory;
@Inject @Inject
protected TransientAsyncBlobStore(BlobStoreContext context, DateService dateService, Crypto crypto, protected TransientAsyncBlobStore(BlobStoreContext context,
ConcurrentMap<String, ConcurrentMap<String, Blob>> containerToBlobs, Provider<UriBuilder> uriBuilders, DateService dateService, Crypto crypto,
ConcurrentMap<String, Location> containerToLocation, HttpGetOptionsListToGetOptions httpGetOptionsConverter,
HttpGetOptionsListToGetOptions httpGetOptionsConverter, IfDirectoryReturnNameStrategy ifDirectoryReturnName,
IfDirectoryReturnNameStrategy ifDirectoryReturnName, Factory blobFactory, BlobUtils blobUtils, BlobUtils blobUtils,
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation, @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service,
@Memoized Supplier<Set<? extends Location>> locations) { Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations,
Factory blobFactory,
ConcurrentMap<String, ConcurrentMap<String, Blob>> containerToBlobs, Provider<UriBuilder> uriBuilders,
ConcurrentMap<String, Location> containerToLocation) {
super(context, blobUtils, service, defaultLocation, locations); super(context, blobUtils, service, defaultLocation, locations);
this.blobFactory = blobFactory; this.blobFactory = blobFactory;
this.dateService = dateService; this.dateService = dateService;
@ -153,65 +161,67 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
public ListenableFuture<PageSet<? extends StorageMetadata>> list(final String container, ListContainerOptions options) { public ListenableFuture<PageSet<? extends StorageMetadata>> list(final String container, ListContainerOptions options) {
final Map<String, Blob> realContents = getContainerToBlobs().get(container); final Map<String, Blob> realContents = getContainerToBlobs().get(container);
// Check if the container exists
if (realContents == null) if (realContents == null)
return immediateFailedFuture(cnfe(container)); return immediateFailedFuture(cnfe(container));
SortedSet<StorageMetadata> contents = newTreeSet(transform(realContents.keySet(), SortedSet<StorageMetadata> contents = newTreeSet(transform(realContents.keySet(),
new Function<String, StorageMetadata>() { new Function<String, StorageMetadata>() {
public StorageMetadata apply(String key) { public StorageMetadata apply(String key) {
Blob oldBlob = realContents.get(key); Blob oldBlob = realContents.get(key);
checkState(oldBlob != null, "blob " + key + " is not present although it was in the list of " checkState(oldBlob != null, "blob " + key + " is not present although it was in the list of "
+ container); + container);
checkState(oldBlob.getMetadata() != null, "blob " + container + "/" + key + " has no metadata"); checkState(oldBlob.getMetadata() != null, "blob " + container + "/" + key + " has no metadata");
MutableBlobMetadata md = copy(oldBlob.getMetadata()); MutableBlobMetadata md = copy(oldBlob.getMetadata());
String directoryName = ifDirectoryReturnName.execute(md); String directoryName = ifDirectoryReturnName.execute(md);
if (directoryName != null) { if (directoryName != null) {
md.setName(directoryName); md.setName(directoryName);
md.setType(StorageType.RELATIVE_PATH); md.setType(StorageType.RELATIVE_PATH);
}
return md;
} }
})); return md;
}
if (options.getMarker() != null) { }));
final String finalMarker = options.getMarker();
StorageMetadata lastMarkerMetadata = find(contents, new Predicate<StorageMetadata>() {
public boolean apply(StorageMetadata metadata) {
return metadata.getName().compareTo(finalMarker) > 0;
}
});
contents = contents.tailSet(lastMarkerMetadata);
}
final String prefix = options.getDir();
if (prefix != null) {
contents = newTreeSet(filter(contents, new Predicate<StorageMetadata>() {
public boolean apply(StorageMetadata o) {
return (o != null && o.getName().startsWith(prefix) && !o.getName().equals(prefix));
}
}));
}
String marker = null; String marker = null;
int maxResults = options.getMaxResults() != null ? options.getMaxResults() : 1000; if (options != null) {
if (!contents.isEmpty()) { if (options.getMarker() != null) {
StorageMetadata lastElement = contents.last(); final String finalMarker = options.getMarker();
contents = newTreeSet(Iterables.limit(contents, maxResults)); StorageMetadata lastMarkerMetadata = find(contents, new Predicate<StorageMetadata>() {
if (!contents.contains(lastElement)) { public boolean apply(StorageMetadata metadata) {
// Partial listing return metadata.getName().compareTo(finalMarker) > 0;
marker = contents.last().getName(); }
});
contents = contents.tailSet(lastMarkerMetadata);
} }
}
final String delimiter = options.isRecursive() ? null : "/"; final String prefix = options.getDir();
if (delimiter != null) { if (prefix != null) {
SortedSet<String> commonPrefixes = newTreeSet( contents = newTreeSet(filter(contents, new Predicate<StorageMetadata>() {
transform(contents, new CommonPrefixes(prefix, delimiter))); public boolean apply(StorageMetadata o) {
commonPrefixes.remove(CommonPrefixes.NO_PREFIX); return (o != null && o.getName().startsWith(prefix) && !o.getName().equals(prefix));
}
}));
}
contents = newTreeSet(filter(contents, new DelimiterFilter(prefix, delimiter))); int maxResults = options.getMaxResults() != null ? options.getMaxResults() : 1000;
if (!contents.isEmpty()) {
StorageMetadata lastElement = contents.last();
contents = newTreeSet(Iterables.limit(contents, maxResults));
if (!contents.contains(lastElement)) {
// Partial listing
marker = contents.last().getName();
}
}
Iterables.<StorageMetadata> addAll(contents, transform(commonPrefixes, final String delimiter = options.isRecursive() ? null : "/";
if (delimiter != null) {
SortedSet<String> commonPrefixes = newTreeSet(
transform(contents, new CommonPrefixes(prefix, delimiter)));
commonPrefixes.remove(CommonPrefixes.NO_PREFIX);
contents = newTreeSet(filter(contents, new DelimiterFilter(prefix, delimiter)));
Iterables.<StorageMetadata> addAll(contents, transform(commonPrefixes,
new Function<String, StorageMetadata>() { new Function<String, StorageMetadata>() {
public StorageMetadata apply(String o) { public StorageMetadata apply(String o) {
MutableStorageMetadata md = new MutableStorageMetadataImpl(); MutableStorageMetadata md = new MutableStorageMetadataImpl();
@ -220,17 +230,18 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
return md; return md;
} }
})); }));
} }
// trim metadata, if the response isn't supposed to be detailed. // trim metadata, if the response isn't supposed to be detailed.
if (!options.isDetailed()) { if (!options.isDetailed()) {
for (StorageMetadata md : contents) { for (StorageMetadata md : contents) {
md.getUserMetadata().clear(); md.getUserMetadata().clear();
}
} }
} }
return Futures.<PageSet<? extends StorageMetadata>> immediateFuture(new PageSetImpl<StorageMetadata>(contents, return Futures.<PageSet<? extends StorageMetadata>> immediateFuture(new PageSetImpl<StorageMetadata>(contents,
marker)); marker));
} }
@ -246,10 +257,10 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
os = new ObjectOutputStream(bout); os = new ObjectOutputStream(bout);
os.writeObject(in); os.writeObject(in);
ObjectInput is = new ObjectInputStream(new ByteArrayInputStream(bout.toByteArray())); ObjectInput is = new ObjectInputStream(new ByteArrayInputStream(bout.toByteArray()));
MutableBlobMetadata out = (MutableBlobMetadata) is.readObject(); MutableBlobMetadata metadata = (MutableBlobMetadata) is.readObject();
convertUserMetadataKeysToLowercase(out); convertUserMetadataKeysToLowercase(metadata);
HttpUtils.copy(in.getContentMetadata(), out.getContentMetadata()); HttpUtils.copy(in.getContentMetadata(), metadata.getContentMetadata());
return out; return metadata;
} catch (Exception e) { } catch (Exception e) {
throw propagate(e); throw propagate(e);
} }
@ -315,8 +326,8 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public ListenableFuture<Boolean> containerExists(final String container) { public ListenableFuture<Boolean> containerExists(final String containerName) {
return immediateFuture(getContainerToBlobs().containsKey(container)); return immediateFuture(getContainerToBlobs().containsKey(containerName));
} }
/** /**
@ -324,16 +335,18 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
*/ */
@Override @Override
public ListenableFuture<PageSet<? extends StorageMetadata>> list() { public ListenableFuture<PageSet<? extends StorageMetadata>> list() {
Iterable<String> containers = getContainerToBlobs().keySet();
return Futures.<PageSet<? extends StorageMetadata>> immediateFuture(new PageSetImpl<StorageMetadata>(transform( return Futures.<PageSet<? extends StorageMetadata>> immediateFuture(new PageSetImpl<StorageMetadata>(transform(
getContainerToBlobs().keySet(), new Function<String, StorageMetadata>() { containers, new Function<String, StorageMetadata>() {
public StorageMetadata apply(String name) { public StorageMetadata apply(String name) {
MutableStorageMetadata cmd = create(); MutableStorageMetadata cmd = create();
cmd.setName(name); cmd.setName(name);
cmd.setType(StorageType.CONTAINER); cmd.setType(StorageType.CONTAINER);
cmd.setLocation(getContainerToLocation().get(name)); cmd.setLocation(getContainerToLocation().get(name));
return cmd; return cmd;
} }
}), null)); }), null));
} }
protected MutableStorageMetadata create() { protected MutableStorageMetadata create() {
@ -437,10 +450,6 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
return 0; return 0;
} }
public HttpRequest getCurrentRequest() {
return new HttpRequest("GET", URI.create("http://stub"));
}
public int incrementFailureCount() { public int incrementFailureCount() {
return 0; return 0;
} }
@ -449,6 +458,11 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
} }
@Override
public HttpRequest getCurrentRequest() {
return new HttpRequest("GET", URI.create("http://stub"));
}
@Override @Override
public void setCurrentRequest(HttpRequest request) { public void setCurrentRequest(HttpRequest request) {
@ -461,15 +475,18 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public ListenableFuture<String> putBlob(String containerName, Blob in) { public ListenableFuture<String> putBlob(String containerName, Blob blob) {
checkArgument(containerName != null, "containerName must be set"); checkNotNull(containerName, "containerName must be set");
checkArgument(in != null, "blob must be set"); checkNotNull(blob, "blob must be set");
ConcurrentMap<String, Blob> container = getContainerToBlobs().get(containerName); ConcurrentMap<String, Blob> container = getContainerToBlobs().get(containerName);
String blobKey = blob.getMetadata().getName();
logger.debug("Put blob with key [%s] to container [%s]", blobKey, containerName);
if (container == null) { if (container == null) {
return Futures.immediateFailedFuture(new IllegalStateException("containerName not found: " + containerName)); return Futures.immediateFailedFuture(new IllegalStateException("containerName not found: " + containerName));
} }
Blob blob = createUpdatedCopyOfBlobInContainer(containerName, in); blob = createUpdatedCopyOfBlobInContainer(containerName, blob);
container.put(blob.getMetadata().getName(), blob); container.put(blob.getMetadata().getName(), blob);
@ -536,74 +553,83 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
*/ */
@Override @Override
public ListenableFuture<Blob> getBlob(final String containerName, final String key, GetOptions options) { public ListenableFuture<Blob> getBlob(final String containerName, final String key, GetOptions options) {
if (!getContainerToBlobs().containsKey(containerName)) logger.debug("Retrieving blob with key %s from container %s", key, containerName);
// If the container doesn't exist, an exception is thrown
if (!getContainerToBlobs().containsKey(containerName)) {
logger.debug("Container %s does not exist", containerName);
return immediateFailedFuture(cnfe(containerName)); return immediateFailedFuture(cnfe(containerName));
}
// If the blob doesn't exist, a null object is returned
Map<String, Blob> realContents = getContainerToBlobs().get(containerName); Map<String, Blob> realContents = getContainerToBlobs().get(containerName);
if (!realContents.containsKey(key)) if (!realContents.containsKey(key)) {
logger.debug("Item %s does not exist in container %s", key, containerName);
return immediateFuture(null); return immediateFuture(null);
Blob object = realContents.get(key);
if (options.getIfMatch() != null) {
if (!object.getMetadata().getETag().equals(options.getIfMatch()))
return immediateFailedFuture(returnResponseException(412));
} }
if (options.getIfNoneMatch() != null) {
if (object.getMetadata().getETag().equals(options.getIfNoneMatch())) Blob blob = realContents.get(key);
return immediateFailedFuture(returnResponseException(304));
} if (options != null) {
if (options.getIfModifiedSince() != null) { if (options.getIfMatch() != null) {
Date modifiedSince = options.getIfModifiedSince(); if (!blob.getMetadata().getETag().equals(options.getIfMatch()))
if (object.getMetadata().getLastModified().before(modifiedSince)) { return immediateFailedFuture(returnResponseException(412));
HttpResponse response = new HttpResponse(304, null, null); }
return immediateFailedFuture(new HttpResponseException(String.format("%1$s is before %2$s", object if (options.getIfNoneMatch() != null) {
if (blob.getMetadata().getETag().equals(options.getIfNoneMatch()))
return immediateFailedFuture(returnResponseException(304));
}
if (options.getIfModifiedSince() != null) {
Date modifiedSince = options.getIfModifiedSince();
if (blob.getMetadata().getLastModified().before(modifiedSince)) {
HttpResponse response = new HttpResponse(304, null, null);
return immediateFailedFuture(new HttpResponseException(String.format("%1$s is before %2$s", blob
.getMetadata().getLastModified(), modifiedSince), null, response)); .getMetadata().getLastModified(), modifiedSince), null, response));
}
}
if (options.getIfUnmodifiedSince() != null) {
Date unmodifiedSince = options.getIfUnmodifiedSince();
if (object.getMetadata().getLastModified().after(unmodifiedSince)) {
HttpResponse response = new HttpResponse(412, null, null);
return immediateFailedFuture(new HttpResponseException(String.format("%1$s is after %2$s", object
.getMetadata().getLastModified(), unmodifiedSince), null, response));
}
}
Blob returnVal = copyBlob(object);
if (options.getRanges() != null && options.getRanges().size() > 0) {
byte[] data;
try {
data = toByteArray(returnVal.getPayload().getInput());
} catch (IOException e) {
return immediateFailedFuture(new RuntimeException(e));
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
for (String s : options.getRanges()) {
if (s.startsWith("-")) {
int length = Integer.parseInt(s.substring(1));
out.write(data, data.length - length, length);
} else if (s.endsWith("-")) {
int offset = Integer.parseInt(s.substring(0, s.length() - 1));
out.write(data, offset, data.length - offset);
} else if (s.contains("-")) {
String[] firstLast = s.split("\\-");
int offset = Integer.parseInt(firstLast[0]);
int last = Integer.parseInt(firstLast[1]);
int length = (last < data.length) ? last + 1 : data.length - offset;
out.write(data, offset, length);
} else {
return immediateFailedFuture(new IllegalArgumentException("first and last were null!"));
} }
} }
ContentMetadata cmd = returnVal.getPayload().getContentMetadata(); if (options.getIfUnmodifiedSince() != null) {
returnVal.setPayload(out.toByteArray()); Date unmodifiedSince = options.getIfUnmodifiedSince();
HttpUtils.copy(cmd, returnVal.getPayload().getContentMetadata()); if (blob.getMetadata().getLastModified().after(unmodifiedSince)) {
returnVal.getPayload().getContentMetadata().setContentLength(new Long(out.toByteArray().length)); HttpResponse response = new HttpResponse(412, null, null);
return immediateFailedFuture(new HttpResponseException(String.format("%1$s is after %2$s", blob
.getMetadata().getLastModified(), unmodifiedSince), null, response));
}
}
blob = copyBlob(blob);
if (options.getRanges() != null && options.getRanges().size() > 0) {
byte[] data;
try {
data = toByteArray(blob.getPayload().getInput());
} catch (IOException e) {
return immediateFailedFuture(new RuntimeException(e));
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
for (String s : options.getRanges()) {
if (s.startsWith("-")) {
int length = Integer.parseInt(s.substring(1));
out.write(data, data.length - length, length);
} else if (s.endsWith("-")) {
int offset = Integer.parseInt(s.substring(0, s.length() - 1));
out.write(data, offset, data.length - offset);
} else if (s.contains("-")) {
String[] firstLast = s.split("\\-");
int offset = Integer.parseInt(firstLast[0]);
int last = Integer.parseInt(firstLast[1]);
int length = (last < data.length) ? last + 1 : data.length - offset;
out.write(data, offset, length);
} else {
return immediateFailedFuture(new IllegalArgumentException("first and last were null!"));
}
}
ContentMetadata cmd = blob.getPayload().getContentMetadata();
blob.setPayload(out.toByteArray());
HttpUtils.copy(cmd, blob.getPayload().getContentMetadata());
blob.getPayload().getContentMetadata().setContentLength(new Long(out.toByteArray().length));
}
} }
checkNotNull(returnVal.getPayload(), "payload " + returnVal); checkNotNull(blob.getPayload(), "payload " + blob);
return immediateFuture(returnVal); return immediateFuture(blob);
} }
/** /**
@ -633,7 +659,7 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
} }
@Override @Override
protected boolean deleteAndVerifyContainerGone(String container) { protected boolean deleteAndVerifyContainerGone(final String container) {
getContainerToBlobs().remove(container); getContainerToBlobs().remove(container);
return getContainerToBlobs().containsKey(container); return getContainerToBlobs().containsKey(container);
} }
@ -650,10 +676,9 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
@Override @Override
public ListenableFuture<Boolean> createContainerInLocation(Location location, String container, public ListenableFuture<Boolean> createContainerInLocation(Location location, String container,
CreateContainerOptions options) { CreateContainerOptions options) {
if (options.isPublicRead()) if (options.isPublicRead())
throw new UnsupportedOperationException("publicRead"); throw new UnsupportedOperationException("publicRead");
return createContainerInLocation(location, container); return createContainerInLocation(location, container);
} }
} }