Merge pull request #738 from andrewgaul/local-blobstore-move-helper-method

Move helper method to TransientStorageStrategy
This commit is contained in:
Adrian Cole 2012-07-23 07:37:30 -07:00
commit b890765e9e
6 changed files with 108 additions and 127 deletions

View File

@ -70,9 +70,6 @@ import org.jclouds.blobstore.strategy.IfDirectoryReturnNameStrategy;
import org.jclouds.blobstore.util.BlobStoreUtils;
import org.jclouds.blobstore.util.BlobUtils;
import org.jclouds.collect.Memoized;
import org.jclouds.crypto.Crypto;
import org.jclouds.crypto.CryptoStreams;
import org.jclouds.date.DateService;
import org.jclouds.domain.Location;
import org.jclouds.filesystem.predicates.validators.FilesystemContainerNameValidator;
import org.jclouds.http.HttpCommand;
@ -83,7 +80,6 @@ import org.jclouds.http.HttpUtils;
import org.jclouds.io.ContentMetadata;
import org.jclouds.io.ContentMetadataCodec;
import org.jclouds.io.Payload;
import org.jclouds.io.Payloads;
import org.jclouds.logging.Logger;
import org.jclouds.rest.annotations.ParamValidators;
@ -106,8 +102,6 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
@Resource
protected Logger logger = Logger.NULL;
protected final DateService dateService;
protected final Crypto crypto;
protected final ContentMetadataCodec contentMetadataCodec;
protected final IfDirectoryReturnNameStrategy ifDirectoryReturnName;
protected final Factory blobFactory;
@ -119,14 +113,11 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service,
Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations,
DateService dateService, Crypto crypto,
ContentMetadataCodec contentMetadataCodec,
IfDirectoryReturnNameStrategy ifDirectoryReturnName,
Factory blobFactory, LocalStorageStrategy storageStrategy) {
super(context, blobUtils, service, defaultLocation, locations);
this.blobFactory = blobFactory;
this.dateService = dateService;
this.crypto = crypto;
this.contentMetadataCodec = contentMetadataCodec;
this.ifDirectoryReturnName = ifDirectoryReturnName;
this.storageStrategy = storageStrategy;
@ -419,15 +410,12 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
}
try {
storageStrategy.putBlob(containerName, blob);
return immediateFuture(storageStrategy.putBlob(containerName, blob));
} catch (IOException e) {
logger.error(e, "An error occurred storing the new blob with name [%s] to container [%s].", blobKey,
containerName);
Throwables.propagate(e);
throw Throwables.propagate(e);
}
String eTag = getEtag(blob);
return immediateFuture(eTag);
}
private void copyPayloadHeadersToBlob(Payload payload, Blob blob) {
@ -550,24 +538,6 @@ public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
}
}
/**
* Calculates the object MD5 and returns it as eTag
*
* @param object
* @return
*/
private String getEtag(Blob object) {
try {
Payloads.calculateMD5(object, crypto.md5());
} catch (IOException ex) {
logger.error(ex, "An error occurred calculating MD5 for object with name %s.", object.getMetadata().getName());
Throwables.propagate(ex);
}
String eTag = CryptoStreams.hex(object.getPayload().getContentMetadata().getContentMD5());
return eTag;
}
private Blob copyBlob(Blob blob) {
Blob returnVal = blobFactory.create(BlobStoreUtils.copy(blob.getMetadata()));
returnVal.setPayload(blob.getPayload());

View File

@ -39,12 +39,14 @@ import org.jclouds.blobstore.LocalStorageStrategy;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.BlobBuilder;
import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.crypto.Crypto;
import org.jclouds.crypto.CryptoStreams;
import org.jclouds.domain.Location;
import org.jclouds.filesystem.predicates.validators.FilesystemBlobKeyValidator;
import org.jclouds.filesystem.predicates.validators.FilesystemContainerNameValidator;
import org.jclouds.filesystem.reference.FilesystemConstants;
import org.jclouds.io.Payload;
import org.jclouds.io.Payloads;
import org.jclouds.logging.Logger;
import org.jclouds.rest.annotations.ParamValidators;
@ -67,17 +69,20 @@ public class FilesystemStorageStrategyImpl implements LocalStorageStrategy {
protected final String baseDirectory;
protected final FilesystemContainerNameValidator filesystemContainerNameValidator;
protected final FilesystemBlobKeyValidator filesystemBlobKeyValidator;
private final Crypto crypto;
@Inject
protected FilesystemStorageStrategyImpl(Provider<BlobBuilder> blobBuilders,
@Named(FilesystemConstants.PROPERTY_BASEDIR) String baseDir,
FilesystemContainerNameValidator filesystemContainerNameValidator,
FilesystemBlobKeyValidator filesystemBlobKeyValidator) {
FilesystemBlobKeyValidator filesystemBlobKeyValidator,
Crypto crypto) {
this.blobBuilders = checkNotNull(blobBuilders, "filesystem storage strategy blobBuilders");
this.baseDirectory = checkNotNull(baseDir, "filesystem storage strategy base directory");
this.filesystemContainerNameValidator = checkNotNull(filesystemContainerNameValidator,
"filesystem container name validator");
this.filesystemBlobKeyValidator = checkNotNull(filesystemBlobKeyValidator, "filesystem blob key validator");
this.crypto = crypto;
}
@Override
@ -209,7 +214,7 @@ public class FilesystemStorageStrategyImpl implements LocalStorageStrategy {
}
@Override
public void putBlob(final String containerName, final Blob blob) throws IOException {
public String putBlob(final String containerName, final Blob blob) throws IOException {
String blobKey = blob.getMetadata().getName();
Payload payload = blob.getPayload();
filesystemContainerNameValidator.validate(containerName);
@ -224,6 +229,9 @@ public class FilesystemStorageStrategyImpl implements LocalStorageStrategy {
output = new FileOutputStream(outputFile);
payload.writeTo(output);
}
Payloads.calculateMD5(payload, crypto.md5());
String eTag = CryptoStreams.hex(payload.getContentMetadata().getContentMD5());
return eTag;
} catch (IOException ex) {
if (outputFile != null) {
outputFile.delete();

View File

@ -86,7 +86,7 @@ public class FilesystemStorageStrategyImplTest {
}
}
}, TestUtils.TARGET_BASE_DIR, new FilesystemContainerNameValidatorImpl(), new FilesystemBlobKeyValidatorImpl());
}, TestUtils.TARGET_BASE_DIR, new FilesystemContainerNameValidatorImpl(), new FilesystemBlobKeyValidatorImpl(), new JCECrypto());
TestUtils.cleanDirectoryContent(TestUtils.TARGET_BASE_DIR);
}
@ -408,7 +408,7 @@ public class FilesystemStorageStrategyImplTest {
assertEquals(fileForPayload.getAbsolutePath(), fullPath + blobKey, "Wrong file path");
}
public void testGetFileForBlobKey_AbsolutePath() throws IOException {
public void testGetFileForBlobKey_AbsolutePath() throws Exception {
String absoluteBasePath = (new File(getAbsoluteDirectory(), "basedir")).getAbsolutePath() + FS;
String absoluteContainerPath = absoluteBasePath + CONTAINER_NAME + FS;
@ -423,7 +423,7 @@ public class FilesystemStorageStrategyImplTest {
return null;
}
}
}, absoluteBasePath, new FilesystemContainerNameValidatorImpl(), new FilesystemBlobKeyValidatorImpl());
}, absoluteBasePath, new FilesystemContainerNameValidatorImpl(), new FilesystemBlobKeyValidatorImpl(), new JCECrypto());
TestUtils.cleanDirectoryContent(absoluteContainerPath);
String blobKey;

View File

@ -117,9 +117,10 @@ public interface LocalStorageStrategy {
* Write a {@link Blob} into a file
* @param container
* @param blob
* @return etag of blob
* @throws IOException
*/
void putBlob(String containerName, Blob blob) throws IOException;
String putBlob(String containerName, Blob blob) throws IOException;
/**
* @param containerName name of container

View File

@ -34,7 +34,6 @@ import static com.google.common.util.concurrent.Futures.immediateFuture;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.Date;
import java.util.Set;
import java.util.SortedSet;
@ -43,9 +42,6 @@ import java.util.concurrent.ExecutorService;
import javax.annotation.Resource;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Provider;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.UriBuilder;
import org.jclouds.Constants;
import org.jclouds.blobstore.domain.Blob;
@ -67,9 +63,6 @@ import org.jclouds.blobstore.strategy.IfDirectoryReturnNameStrategy;
import org.jclouds.blobstore.util.BlobStoreUtils;
import org.jclouds.blobstore.util.BlobUtils;
import org.jclouds.collect.Memoized;
import org.jclouds.crypto.Crypto;
import org.jclouds.crypto.CryptoStreams;
import org.jclouds.date.DateService;
import org.jclouds.domain.Location;
import org.jclouds.http.HttpCommand;
import org.jclouds.http.HttpRequest;
@ -78,11 +71,7 @@ import org.jclouds.http.HttpResponseException;
import org.jclouds.http.HttpUtils;
import org.jclouds.io.ContentMetadata;
import org.jclouds.io.ContentMetadataCodec;
import org.jclouds.io.MutableContentMetadata;
import org.jclouds.io.Payload;
import org.jclouds.io.Payloads;
import org.jclouds.io.payloads.ByteArrayPayload;
import org.jclouds.io.payloads.DelegatingPayload;
import org.jclouds.logging.Logger;
import com.google.common.base.Function;
@ -90,7 +79,6 @@ import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -105,13 +93,10 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
@Resource
protected Logger logger = Logger.NULL;
protected final DateService dateService;
protected final Crypto crypto;
protected final ContentMetadataCodec contentMetadataCodec;
protected final IfDirectoryReturnNameStrategy ifDirectoryReturnName;
protected final Factory blobFactory;
protected final LocalStorageStrategy storageStrategy;
protected final Provider<UriBuilder> uriBuilders;
@Inject
protected TransientAsyncBlobStore(BlobStoreContext context,
@ -119,19 +104,14 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService service,
Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations,
DateService dateService, Crypto crypto,
ContentMetadataCodec contentMetadataCodec,
IfDirectoryReturnNameStrategy ifDirectoryReturnName,
Factory blobFactory, LocalStorageStrategy storageStrategy,
Provider<UriBuilder> uriBuilders) {
Factory blobFactory, LocalStorageStrategy storageStrategy) {
super(context, blobUtils, service, defaultLocation, locations);
this.blobFactory = blobFactory;
this.dateService = dateService;
this.crypto = crypto;
this.contentMetadataCodec = contentMetadataCodec;
this.ifDirectoryReturnName = ifDirectoryReturnName;
this.storageStrategy = storageStrategy;
this.uriBuilders = uriBuilders;
}
/**
@ -439,58 +419,13 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
return Futures.immediateFailedFuture(new IllegalStateException("containerName not found: " + containerName));
}
blob = createUpdatedCopyOfBlobInContainer(containerName, blob);
try {
storageStrategy.putBlob(containerName, blob);
return immediateFuture(storageStrategy.putBlob(containerName, blob));
} catch (IOException e) {
logger.error(e, "An error occurred storing the new blob with name [%s] to container [%s].", blobKey,
containerName);
Throwables.propagate(e);
throw Throwables.propagate(e);
}
String eTag = getEtag(blob);
return immediateFuture(eTag);
}
private Blob createUpdatedCopyOfBlobInContainer(String containerName, Blob in) {
checkNotNull(in, "blob");
checkNotNull(in.getPayload(), "blob.payload");
ByteArrayPayload payload = (in.getPayload() instanceof ByteArrayPayload) ? ByteArrayPayload.class.cast(in
.getPayload()) : null;
if (payload == null)
payload = (in.getPayload() instanceof DelegatingPayload) ? (DelegatingPayload.class.cast(in.getPayload())
.getDelegate() instanceof ByteArrayPayload) ? ByteArrayPayload.class.cast(DelegatingPayload.class
.cast(in.getPayload()).getDelegate()) : null : null;
try {
if (payload == null || !(payload instanceof ByteArrayPayload)) {
MutableContentMetadata oldMd = in.getPayload().getContentMetadata();
ByteArrayOutputStream out = new ByteArrayOutputStream();
in.getPayload().writeTo(out);
payload = (ByteArrayPayload) Payloads.calculateMD5(Payloads.newPayload(out.toByteArray()));
HttpUtils.copy(oldMd, payload.getContentMetadata());
} else {
if (payload.getContentMetadata().getContentMD5() == null)
Payloads.calculateMD5(in, crypto.md5());
}
} catch (IOException e) {
Throwables.propagate(e);
}
Blob blob = blobFactory.create(BlobStoreUtils.copy(in.getMetadata()));
blob.setPayload(payload);
blob.getMetadata().setContainer(containerName);
blob.getMetadata().setUri(
uriBuilders.get().scheme("mem").host(containerName).path(in.getMetadata().getName()).build());
blob.getMetadata().setLastModified(new Date());
String eTag = CryptoStreams.hex(payload.getContentMetadata().getContentMD5());
blob.getMetadata().setETag(eTag);
// Set HTTP headers to match metadata
blob.getAllHeaders().replaceValues(HttpHeaders.LAST_MODIFIED,
Collections.singleton(dateService.rfc822DateFormat(blob.getMetadata().getLastModified())));
blob.getAllHeaders().replaceValues(HttpHeaders.ETAG, Collections.singleton(eTag));
copyPayloadHeadersToBlob(payload, blob);
blob.getAllHeaders().putAll(Multimaps.forMap(blob.getMetadata().getUserMetadata()));
return blob;
}
private void copyPayloadHeadersToBlob(Payload payload, Blob blob) {
@ -613,24 +548,6 @@ public class TransientAsyncBlobStore extends BaseAsyncBlobStore {
}
}
/**
* Calculates the object MD5 and returns it as eTag
*
* @param object
* @return
*/
private String getEtag(Blob object) {
try {
Payloads.calculateMD5(object, crypto.md5());
} catch (IOException ex) {
logger.error(ex, "An error occurred calculating MD5 for object with name %s.", object.getMetadata().getName());
Throwables.propagate(ex);
}
String eTag = CryptoStreams.hex(object.getPayload().getContentMetadata().getContentMD5());
return eTag;
}
private Blob copyBlob(Blob blob) {
Blob returnVal = blobFactory.create(BlobStoreUtils.copy(blob.getMetadata()));
returnVal.setPayload(blob.getPayload());

View File

@ -18,26 +18,63 @@
*/
package org.jclouds.blobstore;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.inject.Provider;
import javax.inject.Inject;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.UriBuilder;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Multimaps;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.Blob.Factory;
import org.jclouds.blobstore.domain.MutableBlobMetadata;
import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.blobstore.util.BlobStoreUtils;
import org.jclouds.crypto.Crypto;
import org.jclouds.crypto.CryptoStreams;
import org.jclouds.date.DateService;
import org.jclouds.domain.Location;
import org.jclouds.http.HttpUtils;
import org.jclouds.io.ContentMetadataCodec;
import org.jclouds.io.MutableContentMetadata;
import org.jclouds.io.Payload;
import org.jclouds.io.Payloads;
import org.jclouds.io.payloads.ByteArrayPayload;
import org.jclouds.io.payloads.DelegatingPayload;
public class TransientStorageStrategy implements LocalStorageStrategy {
private final ConcurrentMap<String, ConcurrentMap<String, Blob>> containerToBlobs = new ConcurrentHashMap<String, ConcurrentMap<String, Blob>>();
private final ConcurrentMap<String, Location> containerToLocation = new ConcurrentHashMap<String, Location>();
private final Supplier<Location> defaultLocation;
private final DateService dateService;
private final Factory blobFactory;
private final Crypto crypto;
private final ContentMetadataCodec contentMetadataCodec;
private final Provider<UriBuilder> uriBuilders;
@Inject
TransientStorageStrategy(final Supplier<Location> defaultLocation) {
TransientStorageStrategy(final Supplier<Location> defaultLocation,
DateService dateService, Factory blobFactory, Crypto crypto,
ContentMetadataCodec contentMetadataCodec,
Provider<UriBuilder> uriBuilders) {
this.defaultLocation = defaultLocation;
this.dateService = dateService;
this.blobFactory = blobFactory;
this.crypto = crypto;
this.contentMetadataCodec = contentMetadataCodec;
this.uriBuilders = uriBuilders;
}
public Iterable<String> getAllContainerNames() {
@ -82,9 +119,13 @@ public class TransientStorageStrategy implements LocalStorageStrategy {
return map == null ? null : map.get(blobName);
}
public void putBlob(final String containerName, final Blob blob) {
public String putBlob(final String containerName, final Blob blob) throws IOException {
Blob newBlob = createUpdatedCopyOfBlobInContainer(containerName, blob);
Map<String, Blob> map = containerToBlobs.get(containerName);
map.put(blob.getMetadata().getName(), blob);
map.put(newBlob.getMetadata().getName(), newBlob);
Payloads.calculateMD5(newBlob, crypto.md5());
String eTag = CryptoStreams.hex(newBlob.getPayload().getContentMetadata().getContentMD5());
return eTag;
}
public void removeBlob(final String containerName, final String blobName) {
@ -101,4 +142,48 @@ public class TransientStorageStrategy implements LocalStorageStrategy {
public Location getLocation(final String containerName) {
return containerToLocation.get(containerName);
}
private Blob createUpdatedCopyOfBlobInContainer(String containerName, Blob in) {
checkNotNull(in, "blob");
checkNotNull(in.getPayload(), "blob.payload");
ByteArrayPayload payload = (in.getPayload() instanceof ByteArrayPayload) ? ByteArrayPayload.class.cast(in
.getPayload()) : null;
if (payload == null)
payload = (in.getPayload() instanceof DelegatingPayload) ? (DelegatingPayload.class.cast(in.getPayload())
.getDelegate() instanceof ByteArrayPayload) ? ByteArrayPayload.class.cast(DelegatingPayload.class
.cast(in.getPayload()).getDelegate()) : null : null;
try {
if (payload == null || !(payload instanceof ByteArrayPayload)) {
MutableContentMetadata oldMd = in.getPayload().getContentMetadata();
ByteArrayOutputStream out = new ByteArrayOutputStream();
in.getPayload().writeTo(out);
payload = (ByteArrayPayload) Payloads.calculateMD5(Payloads.newPayload(out.toByteArray()));
HttpUtils.copy(oldMd, payload.getContentMetadata());
} else {
if (payload.getContentMetadata().getContentMD5() == null)
Payloads.calculateMD5(in, crypto.md5());
}
} catch (IOException e) {
Throwables.propagate(e);
}
Blob blob = blobFactory.create(BlobStoreUtils.copy(in.getMetadata()));
blob.setPayload(payload);
blob.getMetadata().setContainer(containerName);
blob.getMetadata().setUri(
uriBuilders.get().scheme("mem").host(containerName).path(in.getMetadata().getName()).build());
blob.getMetadata().setLastModified(new Date());
String eTag = CryptoStreams.hex(payload.getContentMetadata().getContentMD5());
blob.getMetadata().setETag(eTag);
// Set HTTP headers to match metadata
blob.getAllHeaders().replaceValues(HttpHeaders.LAST_MODIFIED,
Collections.singleton(dateService.rfc822DateFormat(blob.getMetadata().getLastModified())));
blob.getAllHeaders().replaceValues(HttpHeaders.ETAG, Collections.singleton(eTag));
copyPayloadHeadersToBlob(payload, blob);
blob.getAllHeaders().putAll(Multimaps.forMap(blob.getMetadata().getUserMetadata()));
return blob;
}
private void copyPayloadHeadersToBlob(Payload payload, Blob blob) {
blob.getAllHeaders().putAll(contentMetadataCodec.toHeaders(payload.getContentMetadata()));
}
}