JCLOUDS-1450: Use S3-style ETags for MPUs.

S3 uses a different ETag for multipart uploads (MPUs) than regular
objects. The ETag consists of the md5 hash of the concatenated ETags of
individual parts followed by the number of parts (separated by "-").

The patch changes the LocalBlobStore's implementation of
CompleteMultipartUpload to set the S3-style ETag before calling
putBlob() and return that ETag to the caller.

To simplify testing, a new protected method with a default NOOP
implementation is added to the BaseBlobIntegrationTest. It allows
providers to further verify MPUs (i.e. ensuring the correct ETag has
been stored alongside the object).
This commit is contained in:
Timur Alperovich 2018-10-23 17:36:43 -07:00 committed by Andrew Gaul
parent 219e2958d7
commit 539a9854c1
6 changed files with 104 additions and 16 deletions

View File

@ -16,6 +16,7 @@
*/ */
package org.jclouds.filesystem.strategy.internal; package org.jclouds.filesystem.strategy.internal;
import static com.google.common.base.Charsets.US_ASCII;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.io.BaseEncoding.base16; import static com.google.common.io.BaseEncoding.base16;
@ -35,6 +36,7 @@ import static org.jclouds.util.Closeables2.closeQuietly;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.AccessDeniedException; import java.nio.file.AccessDeniedException;
@ -48,6 +50,7 @@ import java.util.Date;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.regex.Pattern;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.inject.Inject; import javax.inject.Inject;
@ -115,6 +118,7 @@ public class FilesystemStorageStrategyImpl implements LocalStorageStrategy {
private static final String XATTR_USER_METADATA_PREFIX = "user.user-metadata."; private static final String XATTR_USER_METADATA_PREFIX = "user.user-metadata.";
private static final byte[] DIRECTORY_MD5 = private static final byte[] DIRECTORY_MD5 =
Hashing.md5().hashBytes(new byte[0]).asBytes(); Hashing.md5().hashBytes(new byte[0]).asBytes();
private static final Pattern MPU_ETAG_FORMAT = Pattern.compile("\"[a-f0-9]{32}-\\d+\"");
@Resource @Resource
protected Logger logger = Logger.NULL; protected Logger logger = Logger.NULL;
@ -353,6 +357,7 @@ public class FilesystemStorageStrategyImpl implements LocalStorageStrategy {
String contentLanguage = null; String contentLanguage = null;
String contentType = null; String contentType = null;
HashCode hashCode = null; HashCode hashCode = null;
String eTag = null;
Date expires = null; Date expires = null;
Tier tier = Tier.STANDARD; Tier tier = Tier.STANDARD;
ImmutableMap.Builder<String, String> userMetadata = ImmutableMap.builder(); ImmutableMap.Builder<String, String> userMetadata = ImmutableMap.builder();
@ -373,7 +378,15 @@ public class FilesystemStorageStrategyImpl implements LocalStorageStrategy {
if (attributes.contains(XATTR_CONTENT_MD5)) { if (attributes.contains(XATTR_CONTENT_MD5)) {
ByteBuffer buf = ByteBuffer.allocate(view.size(XATTR_CONTENT_MD5)); ByteBuffer buf = ByteBuffer.allocate(view.size(XATTR_CONTENT_MD5));
view.read(XATTR_CONTENT_MD5, buf); view.read(XATTR_CONTENT_MD5, buf);
hashCode = HashCode.fromBytes(buf.array()); byte [] etagBytes = buf.array();
if (etagBytes.length == 16) {
// regular object
hashCode = HashCode.fromBytes(buf.array());
eTag = "\"" + hashCode + "\"";
} else {
// multi-part object
eTag = new String(etagBytes, US_ASCII);
}
} }
if (attributes.contains(XATTR_EXPIRES)) { if (attributes.contains(XATTR_EXPIRES)) {
ByteBuffer buf = ByteBuffer.allocate(view.size(XATTR_EXPIRES)); ByteBuffer buf = ByteBuffer.allocate(view.size(XATTR_EXPIRES));
@ -403,6 +416,7 @@ public class FilesystemStorageStrategyImpl implements LocalStorageStrategy {
.contentLanguage(contentLanguage) .contentLanguage(contentLanguage)
.contentLength(byteSource.size()) .contentLength(byteSource.size())
.contentMD5(hashCode) .contentMD5(hashCode)
.eTag(eTag)
.contentType(contentType) .contentType(contentType)
.expires(expires) .expires(expires)
.tier(tier) .tier(tier)
@ -488,23 +502,36 @@ public class FilesystemStorageStrategyImpl implements LocalStorageStrategy {
String tmpBlobName = blobKey + "-" + UUID.randomUUID(); String tmpBlobName = blobKey + "-" + UUID.randomUUID();
File tmpFile = getFileForBlobKey(containerName, tmpBlobName); File tmpFile = getFileForBlobKey(containerName, tmpBlobName);
Path tmpPath = tmpFile.toPath(); Path tmpPath = tmpFile.toPath();
HashingInputStream his = null; boolean isMpu = false;
if (blob.getMetadata() != null && blob.getMetadata().getETag() != null)
isMpu = MPU_ETAG_FORMAT.matcher(blob.getMetadata().getETag()).matches();
InputStream inputStream = null;
byte[] eTag = null;
try { try {
Files.createParentDirs(tmpFile); Files.createParentDirs(tmpFile);
his = new HashingInputStream(Hashing.md5(), payload.openStream()); if (isMpu) {
long actualSize = Files.asByteSink(tmpFile).writeFrom(his); inputStream = payload.openStream();
eTag = blob.getMetadata().getETag().getBytes();
} else {
inputStream = new HashingInputStream(Hashing.md5(), payload.openStream());
}
long actualSize = Files.asByteSink(tmpFile).writeFrom(inputStream);
Long expectedSize = blob.getMetadata().getContentMetadata().getContentLength(); Long expectedSize = blob.getMetadata().getContentMetadata().getContentLength();
if (expectedSize != null && actualSize != expectedSize) { if (expectedSize != null && actualSize != expectedSize) {
throw new IOException("Content-Length mismatch, actual: " + actualSize + throw new IOException("Content-Length mismatch, actual: " + actualSize +
" expected: " + expectedSize); " expected: " + expectedSize);
} }
HashCode actualHashCode = his.hash();
HashCode expectedHashCode = payload.getContentMetadata().getContentMD5AsHashCode(); if (!isMpu) {
if (expectedHashCode != null && !actualHashCode.equals(expectedHashCode)) { HashCode actualHashCode = ((HashingInputStream) inputStream).hash();
throw new IOException("MD5 hash code mismatch, actual: " + actualHashCode + HashCode expectedHashCode = payload.getContentMetadata().getContentMD5AsHashCode();
" expected: " + expectedHashCode); if (expectedHashCode != null && !actualHashCode.equals(expectedHashCode)) {
throw new IOException("MD5 hash code mismatch, actual: " + actualHashCode +
" expected: " + expectedHashCode);
}
payload.getContentMetadata().setContentMD5(actualHashCode);
eTag = actualHashCode.asBytes();
} }
payload.getContentMetadata().setContentMD5(actualHashCode);
if (outputFile.exists()) { if (outputFile.exists()) {
delete(outputFile); delete(outputFile);
@ -513,7 +540,7 @@ public class FilesystemStorageStrategyImpl implements LocalStorageStrategy {
UserDefinedFileAttributeView view = getUserDefinedFileAttributeView(tmpPath); UserDefinedFileAttributeView view = getUserDefinedFileAttributeView(tmpPath);
if (view != null) { if (view != null) {
try { try {
view.write(XATTR_CONTENT_MD5, ByteBuffer.wrap(actualHashCode.asBytes())); view.write(XATTR_CONTENT_MD5, ByteBuffer.wrap(eTag));
writeCommonMetadataAttr(view, blob); writeCommonMetadataAttr(view, blob);
} catch (IOException e) { } catch (IOException e) {
logger.debug("xattrs not supported on %s", tmpPath); logger.debug("xattrs not supported on %s", tmpPath);
@ -527,7 +554,7 @@ public class FilesystemStorageStrategyImpl implements LocalStorageStrategy {
} }
tmpFile = null; tmpFile = null;
return base16().lowerCase().encode(actualHashCode.asBytes()); return base16().lowerCase().encode(eTag);
} finally { } finally {
if (tmpFile != null) { if (tmpFile != null) {
try { try {
@ -536,7 +563,7 @@ public class FilesystemStorageStrategyImpl implements LocalStorageStrategy {
logger.debug("Could not delete %s: %s", tmpFile, e); logger.debug("Could not delete %s: %s", tmpFile, e);
} }
} }
closeQuietly(his); closeQuietly(inputStream);
if (payload != null) { if (payload != null) {
payload.release(); payload.release();
} }

View File

@ -19,11 +19,17 @@ package org.jclouds.filesystem.integration;
import static org.jclouds.filesystem.util.Utils.isMacOSX; import static org.jclouds.filesystem.util.Utils.isMacOSX;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static org.assertj.core.api.Assertions.assertThat;
import com.google.common.hash.Hashing;
import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.BlobMetadata; import org.jclouds.blobstore.domain.BlobMetadata;
import org.jclouds.blobstore.domain.MultipartPart;
import org.jclouds.blobstore.domain.Tier; import org.jclouds.blobstore.domain.Tier;
import org.jclouds.blobstore.integration.internal.BaseBlobIntegrationTest; import org.jclouds.blobstore.integration.internal.BaseBlobIntegrationTest;
import org.jclouds.blobstore.integration.internal.BaseBlobStoreIntegrationTest; import org.jclouds.blobstore.integration.internal.BaseBlobStoreIntegrationTest;
@ -104,6 +110,22 @@ public class FilesystemBlobIntegrationTest extends BaseBlobIntegrationTest {
throw new SkipException("filesystem does not support anonymous access"); throw new SkipException("filesystem does not support anonymous access");
} }
@Override
protected void checkMPUParts(Blob blob, List<MultipartPart> partsList) {
assertThat(blob.getMetadata().getETag()).endsWith(String.format("-%d\"", partsList.size()));
StringBuilder eTags = new StringBuilder();
for (MultipartPart part : partsList) {
eTags.append(part.partETag());
}
String expectedETag = new StringBuilder("\"")
.append(Hashing.md5().hashString(eTags.toString(), US_ASCII))
.append("-")
.append(partsList.size())
.append("\"")
.toString();
assertThat(blob.getMetadata().getETag()).isEqualTo(expectedETag);
}
protected void checkExtendedAttributesSupport() { protected void checkExtendedAttributesSupport() {
if (isMacOSX()) { if (isMacOSX()) {
throw new SkipException("filesystem does not support extended attributes in Mac OSX"); throw new SkipException("filesystem does not support extended attributes in Mac OSX");

View File

@ -25,6 +25,7 @@ import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Iterables.tryFind; import static com.google.common.collect.Iterables.tryFind;
import static com.google.common.collect.Sets.filter; import static com.google.common.collect.Sets.filter;
import static com.google.common.collect.Sets.newTreeSet; import static com.google.common.collect.Sets.newTreeSet;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive; import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
import java.io.File; import java.io.File;
@ -45,6 +46,7 @@ import javax.annotation.Resource;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Singleton; import javax.inject.Singleton;
import com.google.common.hash.Hashing;
import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.ContainerNotFoundException; import org.jclouds.blobstore.ContainerNotFoundException;
@ -824,6 +826,8 @@ public final class LocalBlobStore implements BlobStore {
public String completeMultipartUpload(MultipartUpload mpu, List<MultipartPart> parts) { public String completeMultipartUpload(MultipartUpload mpu, List<MultipartPart> parts) {
ImmutableList.Builder<InputStream> streams = ImmutableList.builder(); ImmutableList.Builder<InputStream> streams = ImmutableList.builder();
long contentLength = 0; long contentLength = 0;
StringBuilder partHashes = new StringBuilder();
for (MultipartPart part : parts) { for (MultipartPart part : parts) {
Blob blobPart = getBlob(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber()); Blob blobPart = getBlob(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber());
contentLength += blobPart.getMetadata().getContentMetadata().getContentLength(); contentLength += blobPart.getMetadata().getContentMetadata().getContentLength();
@ -834,11 +838,19 @@ public final class LocalBlobStore implements BlobStore {
throw propagate(ioe); throw propagate(ioe);
} }
streams.add(is); streams.add(is);
partHashes.append(blobPart.getMetadata().getETag());
} }
String mpuETag = new StringBuilder("\"")
.append(Hashing.md5().hashString(partHashes.toString(), US_ASCII).toString())
.append("-")
.append(parts.size())
.append("\"")
.toString();
PayloadBlobBuilder blobBuilder = blobBuilder(mpu.blobName()) PayloadBlobBuilder blobBuilder = blobBuilder(mpu.blobName())
.userMetadata(mpu.blobMetadata().getUserMetadata()) .userMetadata(mpu.blobMetadata().getUserMetadata())
.payload(new SequenceInputStream(Iterators.asEnumeration(streams.build().iterator()))) .payload(new SequenceInputStream(Iterators.asEnumeration(streams.build().iterator())))
.contentLength(contentLength); .contentLength(contentLength)
.eTag(mpuETag);
String cacheControl = mpu.blobMetadata().getContentMetadata().getCacheControl(); String cacheControl = mpu.blobMetadata().getContentMetadata().getCacheControl();
if (cacheControl != null) { if (cacheControl != null) {
blobBuilder.cacheControl(cacheControl); blobBuilder.cacheControl(cacheControl);
@ -869,7 +881,7 @@ public final class LocalBlobStore implements BlobStore {
blobBuilder.tier(tier); blobBuilder.tier(tier);
} }
String eTag = putBlob(mpu.containerName(), blobBuilder.build()); putBlob(mpu.containerName(), blobBuilder.build());
for (MultipartPart part : parts) { for (MultipartPart part : parts) {
removeBlob(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber()); removeBlob(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber());
@ -878,7 +890,7 @@ public final class LocalBlobStore implements BlobStore {
setBlobAccess(mpu.containerName(), mpu.blobName(), mpu.putOptions().getBlobAccess()); setBlobAccess(mpu.containerName(), mpu.blobName(), mpu.putOptions().getBlobAccess());
return eTag; return mpuETag;
} }
@Override @Override

View File

@ -57,6 +57,12 @@ public interface BlobBuilder {
*/ */
BlobBuilder userMetadata(Map<String, String> userMetadata); BlobBuilder userMetadata(Map<String, String> userMetadata);
/**
* @param eTag
* Entity Tag associated with the Blob. Typically, content MD5 hash.
*/
BlobBuilder eTag(String eTag);
/** /**
* *
* @param payload * @param payload
@ -132,5 +138,7 @@ public interface BlobBuilder {
PayloadBlobBuilder contentEncoding(String contentEncoding); PayloadBlobBuilder contentEncoding(String contentEncoding);
PayloadBlobBuilder expires(Date expires); PayloadBlobBuilder expires(Date expires);
PayloadBlobBuilder eTag(String string);
} }
} }

View File

@ -45,6 +45,7 @@ public class BlobBuilderImpl implements BlobBuilder {
private Tier tier = Tier.STANDARD; private Tier tier = Tier.STANDARD;
private Map<String, String> userMetadata = Maps.newLinkedHashMap(); private Map<String, String> userMetadata = Maps.newLinkedHashMap();
private StorageType type = StorageType.BLOB; private StorageType type = StorageType.BLOB;
private String eTag;
@Override @Override
public BlobBuilder name(String name) { public BlobBuilder name(String name) {
@ -66,6 +67,12 @@ public class BlobBuilderImpl implements BlobBuilder {
return this; return this;
} }
@Override
public BlobBuilder eTag(String eTag) {
this.eTag = eTag;
return this;
}
@Override @Override
public BlobBuilder userMetadata(Map<String, String> userMetadata) { public BlobBuilder userMetadata(Map<String, String> userMetadata) {
if (userMetadata != null) if (userMetadata != null)
@ -126,6 +133,7 @@ public class BlobBuilderImpl implements BlobBuilder {
blob.getMetadata().setUserMetadata(userMetadata); blob.getMetadata().setUserMetadata(userMetadata);
blob.getMetadata().setType(type); blob.getMetadata().setType(type);
blob.getMetadata().setTier(tier); blob.getMetadata().setTier(tier);
blob.getMetadata().setETag(eTag);
return blob; return blob;
} }
@ -255,6 +263,12 @@ public class BlobBuilderImpl implements BlobBuilder {
return this; return this;
} }
@Override
public PayloadBlobBuilder eTag(String eTag) {
this.builder.eTag(eTag);
return this;
}
@Override @Override
public PayloadBlobBuilder forSigning() { public PayloadBlobBuilder forSigning() {
return builder.forSigning(); return builder.forSigning();

View File

@ -899,6 +899,9 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
.getMetadata().getContentMetadata().getContentLanguage(); .getMetadata().getContentMetadata().getContentLanguage();
} }
protected void checkMPUParts(Blob newBlob, List<MultipartPart> parts) {
}
protected static volatile Crypto crypto; protected static volatile Crypto crypto;
static { static {
try { try {
@ -1321,6 +1324,7 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
assertThat(ByteStreams2.toByteArrayAndClose(newBlob.getPayload().openStream())).isEqualTo(byteSource.read()); assertThat(ByteStreams2.toByteArrayAndClose(newBlob.getPayload().openStream())).isEqualTo(byteSource.read());
checkContentMetadata(newBlob); checkContentMetadata(newBlob);
checkUserMetadata(newBlob.getMetadata().getUserMetadata(), blob.getMetadata().getUserMetadata()); checkUserMetadata(newBlob.getMetadata().getUserMetadata(), blob.getMetadata().getUserMetadata());
checkMPUParts(newBlob, parts);
} finally { } finally {
returnContainer(container); returnContainer(container);
} }
@ -1366,6 +1370,7 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
assertThat(ByteStreams2.toByteArrayAndClose(newBlob.getPayload().openStream())).isEqualTo(byteSource.read()); assertThat(ByteStreams2.toByteArrayAndClose(newBlob.getPayload().openStream())).isEqualTo(byteSource.read());
checkContentMetadata(newBlob); checkContentMetadata(newBlob);
checkUserMetadata(newBlob.getMetadata().getUserMetadata(), blob.getMetadata().getUserMetadata()); checkUserMetadata(newBlob.getMetadata().getUserMetadata(), blob.getMetadata().getUserMetadata());
checkMPUParts(newBlob, parts);
} finally { } finally {
returnContainer(container); returnContainer(container);
} }