Merge pull request #616 from andrewgaul/transient-storage-strategy-more

Delegate blob storage to TransientStorageStrategy
This commit is contained in:
Adrian Cole 2012-05-08 16:51:55 -07:00
commit 47ad9e2bac
5 changed files with 135 additions and 100 deletions

View File

@ -163,8 +163,7 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
SortedSet<StorageMetadata> contents = newTreeSet(transform(blobBelongingToContainer,
new Function<String, StorageMetadata>() {
public StorageMetadata apply(String key) {
Blob oldBlob = loadFileBlob(container, key);
Blob oldBlob = loadBlob(container, key);
checkState(oldBlob != null, "blob " + key + " is not present although it was in the list of "
+ container);
checkState(oldBlob.getMetadata() != null, "blob " + container + "/" + key + " has no metadata");
@ -342,13 +341,6 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
return immediateFuture(result);
}
public String getFirstQueryOrNull(String string, @Nullable HttpRequestOptions options) {
if (options == null)
return null;
Collection<String> values = options.buildQueryParameters().get(string);
return (values != null && values.size() >= 1) ? values.iterator().next() : null;
}
/**
* Load the blob with the given key belonging to the container with the given
* name. There must exist a resource on the file system whose complete name
@ -361,7 +353,7 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
*
* @return the blob belonging to the given container with the given key
*/
private Blob loadFileBlob(String container, String key) {
private Blob loadBlob(final String container, final String key) {
logger.debug("Opening blob in container: %s - %s", container, key);
BlobBuilder builder = blobUtils.blobBuilder();
builder.name(key);
@ -527,7 +519,7 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
return immediateFuture(null);
}
Blob blob = loadFileBlob(containerName, key);
Blob blob = loadBlob(containerName, key);
if (options != null) {
if (options.getIfMatch() != null) {

View File

@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import javax.inject.Inject;
@ -37,6 +38,7 @@ import org.jclouds.blobstore.TransientAsyncBlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.BlobMetadata;
import org.jclouds.blobstore.domain.PageSet;
import org.jclouds.blobstore.domain.StorageMetadata;
import org.jclouds.blobstore.functions.HttpGetOptionsListToGetOptions;
import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.concurrent.Futures;
@ -55,6 +57,7 @@ import org.jclouds.openstack.swift.domain.ObjectInfo;
import org.jclouds.openstack.swift.domain.SwiftObject;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
@ -74,18 +77,16 @@ public class StubSwiftAsyncClient implements CommonSwiftAsyncClient {
private final ResourceToObjectInfo blob2ObjectInfo;
private final ListContainerOptionsToBlobStoreListContainerOptions container2ContainerListOptions;
private final ResourceToObjectList resource2ObjectList;
private final ConcurrentMap<String, ConcurrentMap<String, Blob>> containerToBlobs;
private final ExecutorService service;
@Inject
private StubSwiftAsyncClient(@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service,
TransientAsyncBlobStore blobStore, ConcurrentMap<String, ConcurrentMap<String, Blob>> containerToBlobs,
TransientAsyncBlobStore blobStore,
SwiftObject.Factory objectProvider, HttpGetOptionsListToGetOptions httpGetOptionsConverter,
ObjectToBlob object2Blob, BlobToObject blob2Object, ResourceToObjectInfo blob2ObjectInfo,
ListContainerOptionsToBlobStoreListContainerOptions container2ContainerListOptions,
ResourceToObjectList resource2ContainerList) {
this.service = service;
this.containerToBlobs = containerToBlobs;
this.blobStore = blobStore;
this.objectProvider = objectProvider;
this.httpGetOptionsConverter = httpGetOptionsConverter;
@ -145,10 +146,18 @@ public class StubSwiftAsyncClient implements CommonSwiftAsyncClient {
public ListenableFuture<? extends Set<ContainerMetadata>> listContainers(
org.jclouds.openstack.swift.options.ListContainerOptions... options) {
return immediateFuture(Sets.newHashSet(Iterables.transform(blobStore.getContainerToBlobs().keySet(),
new Function<String, ContainerMetadata>() {
public ContainerMetadata apply(String name) {
return new ContainerMetadata(name, -1, -1, null, new HashMap<String,String>());
PageSet<? extends StorageMetadata> listing;
try {
listing = blobStore.list().get();
} catch (ExecutionException ee) {
throw Throwables.propagate(ee);
} catch (InterruptedException ie) {
throw Throwables.propagate(ie);
}
return immediateFuture(Sets.newHashSet(Iterables.transform(listing,
new Function<StorageMetadata, ContainerMetadata>() {
public ContainerMetadata apply(StorageMetadata md) {
return new ContainerMetadata(md.getName(), -1, -1, null, new HashMap<String,String>());
}
})));
}
@ -186,7 +195,7 @@ public class StubSwiftAsyncClient implements CommonSwiftAsyncClient {
@Override
public ListenableFuture<Boolean> objectExists(String bucketName, String key) {
return immediateFuture(containerToBlobs.get(bucketName).containsKey(key));
return blobStore.blobExists(bucketName, key);
}
}

View File

@ -122,9 +122,7 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
protected final DateService dateService;
protected final Crypto crypto;
protected final ConcurrentMap<String, ConcurrentMap<String, Blob>> containerToBlobs;
protected final Provider<UriBuilder> uriBuilders;
protected final ConcurrentMap<String, Location> containerToLocation;
protected final HttpGetOptionsListToGetOptions httpGetOptionsConverter;
protected final IfDirectoryReturnNameStrategy ifDirectoryReturnName;
protected final Factory blobFactory;
@ -139,21 +137,15 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service,
Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations,
Factory blobFactory,
ConcurrentMap<String, ConcurrentMap<String, Blob>> containerToBlobs, Provider<UriBuilder> uriBuilders,
ConcurrentMap<String, Location> containerToLocation) {
Factory blobFactory, Provider<UriBuilder> uriBuilders) {
super(context, blobUtils, service, defaultLocation, locations);
this.blobFactory = blobFactory;
this.dateService = dateService;
this.crypto = crypto;
this.containerToBlobs = containerToBlobs;
this.uriBuilders = uriBuilders;
this.containerToLocation = containerToLocation;
this.httpGetOptionsConverter = httpGetOptionsConverter;
this.ifDirectoryReturnName = ifDirectoryReturnName;
getContainerToLocation().put("stub", defaultLocation.get());
getContainerToBlobs().put("stub", new ConcurrentHashMap<String, Blob>());
this.storageStrategy = new TransientStorageStrategy();
this.storageStrategy = new TransientStorageStrategy(defaultLocation);
}
/**
@ -161,16 +153,18 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
*/
@Override
public ListenableFuture<PageSet<? extends StorageMetadata>> list(final String container, ListContainerOptions options) {
final Map<String, Blob> realContents = getContainerToBlobs().get(container);
// Check if the container exists
if (realContents == null)
if (!storageStrategy.containerExists(container))
return immediateFailedFuture(cnfe(container));
SortedSet<StorageMetadata> contents = newTreeSet(transform(realContents.keySet(),
// Loading blobs from container
Iterable<String> blobBelongingToContainer = storageStrategy.getBlobKeysInsideContainer(container);
SortedSet<StorageMetadata> contents = newTreeSet(transform(blobBelongingToContainer,
new Function<String, StorageMetadata>() {
public StorageMetadata apply(String key) {
Blob oldBlob = realContents.get(key);
Blob oldBlob = loadBlob(container, key);
checkState(oldBlob != null, "blob " + key + " is not present although it was in the list of "
+ container);
checkState(oldBlob.getMetadata() != null, "blob " + container + "/" + key + " has no metadata");
@ -297,7 +291,7 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
*/
@Override
public ListenableFuture<Void> clearContainer(final String container) {
getContainerToBlobs().get(container).clear();
storageStrategy.clearContainer(container);
return immediateFuture(null);
}
@ -313,7 +307,7 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
public ListenableFuture<Boolean> deleteContainerIfEmpty(final String container) {
Boolean returnVal = true;
if (storageStrategy.containerExists(container)) {
if (getContainerToBlobs().get(container).size() == 0)
if (Iterables.isEmpty(storageStrategy.getBlobKeysInsideContainer(container)))
storageStrategy.deleteContainer(container);
else
returnVal = false;
@ -342,7 +336,7 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
MutableStorageMetadata cmd = create();
cmd.setName(name);
cmd.setType(StorageType.CONTAINER);
cmd.setLocation(getContainerToLocation().get(name));
cmd.setLocation(storageStrategy.getLocation(name));
return cmd;
}
}), null));
@ -356,20 +350,15 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
* {@inheritDoc}
*/
@Override
public ListenableFuture<Boolean> createContainerInLocation(final Location location, final String name) {
if (storageStrategy.containerExists(name)) {
return immediateFuture(Boolean.FALSE);
}
getContainerToBlobs().put(name, new ConcurrentHashMap<String, Blob>());
getContainerToLocation().put(name, location != null ? location : defaultLocation.get());
return immediateFuture(Boolean.TRUE);
public ListenableFuture<Boolean> createContainerInLocation(final Location location,
final String name) {
boolean result = storageStrategy.createContainerInLocation(name, location);
return immediateFuture(result);
}
public String getFirstQueryOrNull(String string, @Nullable HttpRequestOptions options) {
if (options == null)
return null;
Collection<String> values = options.buildQueryParameters().get(string);
return (values != null && values.size() >= 1) ? values.iterator().next() : null;
private Blob loadBlob(final String container, final String key) {
logger.debug("Opening blob in container: %s - %s", container, key);
return storageStrategy.getBlob(container, key);
}
protected static class DelimiterFilter implements Predicate<StorageMetadata> {
@ -477,22 +466,21 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
public ListenableFuture<String> putBlob(String containerName, Blob blob) {
checkNotNull(containerName, "containerName must be set");
checkNotNull(blob, "blob must be set");
ConcurrentMap<String, Blob> container = getContainerToBlobs().get(containerName);
String blobKey = blob.getMetadata().getName();
logger.debug("Put blob with key [%s] to container [%s]", blobKey, containerName);
if (container == null) {
if (!storageStrategy.containerExists(containerName)) {
return Futures.immediateFailedFuture(new IllegalStateException("containerName not found: " + containerName));
}
blob = createUpdatedCopyOfBlobInContainer(containerName, blob);
container.put(blob.getMetadata().getName(), blob);
storageStrategy.putBlob(containerName, blob);
return immediateFuture(Iterables.getOnlyElement(blob.getAllHeaders().get(HttpHeaders.ETAG)));
}
protected Blob createUpdatedCopyOfBlobInContainer(String containerName, Blob in) {
private Blob createUpdatedCopyOfBlobInContainer(String containerName, Blob in) {
checkNotNull(in, "blob");
checkNotNull(in.getPayload(), "blob.payload");
ByteArrayPayload payload = (in.getPayload() instanceof ByteArrayPayload) ? ByteArrayPayload.class.cast(in
@ -558,13 +546,12 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
return immediateFailedFuture(cnfe(containerName));
}
// If the blob doesn't exist, a null object is returned
Map<String, Blob> realContents = getContainerToBlobs().get(containerName);
if (!realContents.containsKey(key)) {
if (!storageStrategy.blobExists(containerName, key)) {
logger.debug("Item %s does not exist in container %s", key, containerName);
return immediateFuture(null);
}
Blob blob = realContents.get(key);
Blob blob = loadBlob(containerName, key);
if (options != null) {
if (options.getIfMatch() != null) {
@ -652,20 +639,12 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
return returnVal;
}
public ConcurrentMap<String, ConcurrentMap<String, Blob>> getContainerToBlobs() {
return containerToBlobs;
}
@Override
protected boolean deleteAndVerifyContainerGone(final String container) {
storageStrategy.deleteContainer(container);
return storageStrategy.containerExists(container);
}
private ConcurrentMap<String, Location> getContainerToLocation() {
return containerToLocation;
}
@Override
public ListenableFuture<String> putBlob(String container, Blob blob, PutOptions options) {
// TODO implement options
@ -679,29 +658,4 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
throw new UnsupportedOperationException("publicRead");
return createContainerInLocation(location, container);
}
private class TransientStorageStrategy {
public Iterable<String> getAllContainerNames() {
return getContainerToBlobs().keySet();
}
public boolean containerExists(final String containerName) {
return getContainerToBlobs().containsKey(containerName);
}
public void deleteContainer(final String containerName) {
getContainerToBlobs().remove(containerName);
}
public boolean blobExists(final String containerName, final String blobName) {
Map<String, Blob> map = containerToBlobs.get(containerName);
return map != null && map.containsKey(blobName);
}
public void removeBlob(final String containerName, final String blobName) {
if (storageStrategy.containerExists(containerName)) {
getContainerToBlobs().get(containerName).remove(blobName);
}
}
}
}

View File

@ -0,0 +1,92 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.blobstore;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.domain.Location;
public class TransientStorageStrategy {
private final ConcurrentMap<String, ConcurrentMap<String, Blob>> containerToBlobs = new ConcurrentHashMap<String, ConcurrentMap<String, Blob>>();
private final ConcurrentMap<String, Location> containerToLocation = new ConcurrentHashMap<String, Location>();
private final Supplier<Location> defaultLocation;
public TransientStorageStrategy(final Supplier<Location> defaultLocation) {
this.defaultLocation = Preconditions.checkNotNull(defaultLocation);
}
public Iterable<String> getAllContainerNames() {
return containerToBlobs.keySet();
}
public boolean containerExists(final String containerName) {
return containerToBlobs.containsKey(containerName);
}
public void clearContainer(final String containerName) {
containerToBlobs.get(containerName).clear();
}
public boolean createContainerInLocation(final String containerName, final Location location) {
ConcurrentMap<String, Blob> origValue = containerToBlobs.putIfAbsent(
containerName, new ConcurrentHashMap<String, Blob>());
if (origValue != null) {
return false;
}
containerToLocation.put(containerName, location != null ? location : defaultLocation.get());
return true;
}
public void deleteContainer(final String containerName) {
containerToBlobs.remove(containerName);
}
public boolean blobExists(final String containerName, final String blobName) {
Map<String, Blob> map = containerToBlobs.get(containerName);
return map != null && map.containsKey(blobName);
}
public Blob getBlob(final String containerName, final String blobName) {
Map<String, Blob> map = containerToBlobs.get(containerName);
return map == null ? null : map.get(blobName);
}
public void putBlob(final String containerName, final Blob blob) {
Map<String, Blob> map = containerToBlobs.get(containerName);
map.put(blob.getMetadata().getName(), blob);
}
public void removeBlob(final String containerName, final String blobName) {
containerToBlobs.get(containerName).remove(blobName);
}
public Iterable<String> getBlobKeysInsideContainer(final String containerName) {
return containerToBlobs.get(containerName).keySet();
}
public Location getLocation(final String containerName) {
return containerToLocation.get(containerName);
}
}

View File

@ -18,9 +18,6 @@
*/
package org.jclouds.blobstore.config;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.jclouds.blobstore.AsyncBlobStore;
import org.jclouds.blobstore.BlobRequestSigner;
import org.jclouds.blobstore.BlobStore;
@ -41,20 +38,11 @@ import com.google.inject.TypeLiteral;
* @author Adrian Cole
*/
public class TransientBlobStoreContextModule extends AbstractModule {
// must be singleton for all threads and all objects or tests may fail;
static final ConcurrentHashMap<String, ConcurrentMap<String, Blob>> map = new ConcurrentHashMap<String, ConcurrentMap<String, Blob>>();
static final ConcurrentHashMap<String, Location> containerToLocation = new ConcurrentHashMap<String, Location>();
@Override
protected void configure() {
bind(AsyncBlobStore.class).to(TransientAsyncBlobStore.class).asEagerSingleton();
// forward all requests from TransientBlobStore to TransientAsyncBlobStore. needs above binding as cannot proxy a class
BinderUtils.bindClient(binder(), TransientBlobStore.class, AsyncBlobStore.class, ImmutableMap.<Class<?>, Class<?>>of());
bind(new TypeLiteral<ConcurrentMap<String, ConcurrentMap<String, Blob>>>() {
}).toInstance(map);
bind(new TypeLiteral<ConcurrentMap<String, Location>>() {
}).toInstance(containerToLocation);
install(new BlobStoreObjectModule());
install(new BlobStoreMapModule());
bind(BlobStore.class).to(TransientBlobStore.class);