JCLOUDS-894: Add portable multipart upload for GCS

This commit is contained in:
Andrew Gaul 2015-08-26 12:48:44 -07:00
parent e222567821
commit 2e1e109a0c
7 changed files with 47 additions and 367 deletions

View File

@ -58,15 +58,16 @@ import org.jclouds.googlecloudstorage.blobstore.functions.BlobStoreListContainer
import org.jclouds.googlecloudstorage.blobstore.functions.BucketToStorageMetadata;
import org.jclouds.googlecloudstorage.blobstore.functions.ObjectListToStorageMetadata;
import org.jclouds.googlecloudstorage.blobstore.functions.ObjectToBlobMetadata;
import org.jclouds.googlecloudstorage.blobstore.strategy.internal.MultipartUploadStrategy;
import org.jclouds.googlecloudstorage.domain.Bucket;
import org.jclouds.googlecloudstorage.domain.DomainResourceReferences;
import org.jclouds.googlecloudstorage.domain.GoogleCloudStorageObject;
import org.jclouds.googlecloudstorage.domain.ListPageWithPrefixes;
import org.jclouds.googlecloudstorage.domain.ObjectAccessControls;
import org.jclouds.googlecloudstorage.domain.templates.BucketTemplate;
import org.jclouds.googlecloudstorage.domain.templates.ComposeObjectTemplate;
import org.jclouds.googlecloudstorage.domain.templates.ObjectAccessControlsTemplate;
import org.jclouds.googlecloudstorage.domain.templates.ObjectTemplate;
import org.jclouds.googlecloudstorage.options.InsertObjectOptions;
import org.jclouds.googlecloudstorage.options.ListObjectOptions;
import org.jclouds.http.HttpResponseException;
import org.jclouds.io.ContentMetadata;
@ -77,6 +78,7 @@ import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.hash.HashCode;
import com.google.inject.Provider;
@ -90,7 +92,6 @@ public final class GoogleCloudStorageBlobStore extends BaseBlobStore {
private final Provider<FetchBlobMetadata> fetchBlobMetadataProvider;
private final BlobMetadataToObjectTemplate blobMetadataToObjectTemplate;
private final BlobStoreListContainerOptionsToListObjectOptions listContainerOptionsToListObjectOptions;
private final Provider<MultipartUploadStrategy> multipartUploadStrategy;
private final Supplier<String> projectId;
private final BlobToHttpGetOptions blob2ObjectGetOptions;
@ -101,7 +102,7 @@ public final class GoogleCloudStorageBlobStore extends BaseBlobStore {
Provider<FetchBlobMetadata> fetchBlobMetadataProvider,
BlobMetadataToObjectTemplate blobMetadataToObjectTemplate,
BlobStoreListContainerOptionsToListObjectOptions listContainerOptionsToListObjectOptions,
Provider<MultipartUploadStrategy> multipartUploadStrategy, @CurrentProject Supplier<String> projectId,
@CurrentProject Supplier<String> projectId,
BlobToHttpGetOptions blob2ObjectGetOptions) {
super(context, blobUtils, defaultLocation, locations, slicer);
this.api = api;
@ -112,7 +113,6 @@ public final class GoogleCloudStorageBlobStore extends BaseBlobStore {
this.blobMetadataToObjectTemplate = blobMetadataToObjectTemplate;
this.listContainerOptionsToListObjectOptions = listContainerOptionsToListObjectOptions;
this.projectId = projectId;
this.multipartUploadStrategy = multipartUploadStrategy;
this.blob2ObjectGetOptions = checkNotNull(blob2ObjectGetOptions, "blob2ObjectGetOptions");
}
@ -231,7 +231,7 @@ public final class GoogleCloudStorageBlobStore extends BaseBlobStore {
@Override
public String putBlob(String container, Blob blob, PutOptions options) {
if (options.isMultipart()) {
return multipartUploadStrategy.get().execute(container, blob);
return putMultipartBlob(container, blob, options);
} else {
return putBlob(container, blob);
}
@ -354,41 +354,74 @@ public final class GoogleCloudStorageBlobStore extends BaseBlobStore {
@Override
public MultipartUpload initiateMultipartUpload(String container, BlobMetadata blobMetadata) {
throw new UnsupportedOperationException("not yet implemented");
String uploadId = blobMetadata.getName();
return MultipartUpload.create(container, blobMetadata.getName(), uploadId, blobMetadata);
}
@Override
public void abortMultipartUpload(MultipartUpload mpu) {
throw new UnsupportedOperationException("not yet implemented");
ImmutableList.Builder<String> builder = ImmutableList.builder();
List<MultipartPart> parts = listMultipartUpload(mpu);
for (MultipartPart part : parts) {
builder.add(getMPUPartName(mpu, part.partNumber()));
}
removeBlobs(mpu.containerName(), builder.build());
}
@Override
public String completeMultipartUpload(MultipartUpload mpu, List<MultipartPart> parts) {
throw new UnsupportedOperationException("not yet implemented");
ImmutableList.Builder<GoogleCloudStorageObject> builder = ImmutableList.builder();
for (MultipartPart part : parts) {
builder.add(api.getObjectApi().getObject(mpu.containerName(), getMPUPartName(mpu, part.partNumber())));
}
ObjectTemplate destination = blobMetadataToObjectTemplate.apply(mpu.blobMetadata());
ComposeObjectTemplate template = ComposeObjectTemplate.builder().fromGoogleCloudStorageObject(builder.build())
.destination(destination).build();
return api.getObjectApi().composeObjects(mpu.containerName(), mpu.blobName(), template).etag();
// TODO: delete components?
}
@Override
public MultipartPart uploadMultipartPart(MultipartUpload mpu, int partNumber, Payload payload) {
throw new UnsupportedOperationException("not yet implemented");
String partName = getMPUPartName(mpu, partNumber);
long partSize = payload.getContentMetadata().getContentLength();
InsertObjectOptions insertOptions = new InsertObjectOptions().name(partName);
GoogleCloudStorageObject object = api.getObjectApi().simpleUpload(mpu.containerName(),
mpu.blobMetadata().getContentMetadata().getContentType(), partSize, payload, insertOptions);
return MultipartPart.create(partNumber, partSize, object.etag());
}
@Override
public List<MultipartPart> listMultipartUpload(MultipartUpload mpu) {
throw new UnsupportedOperationException("not yet implemented");
ImmutableList.Builder<MultipartPart> parts = ImmutableList.builder();
PageSet<? extends StorageMetadata> pageSet = list(mpu.containerName(),
new ListContainerOptions().prefix(mpu.blobName() + "_"));
// TODO: pagination
for (StorageMetadata sm : pageSet) {
int lastUnderscore = sm.getName().lastIndexOf('_');
int partNumber = Integer.parseInt(sm.getName().substring(lastUnderscore + 1));
parts.add(MultipartPart.create(partNumber, sm.getSize(), sm.getETag()));
}
return parts.build();
}
@Override
public long getMinimumMultipartPartSize() {
throw new UnsupportedOperationException("not yet implemented");
return 5L * 1024L * 1024L;
}
@Override
public long getMaximumMultipartPartSize() {
throw new UnsupportedOperationException("not yet implemented");
return 5L * 1024L * 1024L * 1024L;
}
@Override
public int getMaximumNumberOfParts() {
throw new UnsupportedOperationException("not yet implemented");
// TODO: should this be 32? See: https://cloud.google.com/storage/docs/composite-objects
return 10 * 1000;
}
private static String getMPUPartName(MultipartUpload mpu, int partNumber) {
return String.format("%s_%08d", mpu.id(), partNumber);
}
}

View File

@ -1,30 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.googlecloudstorage.blobstore.strategy.internal;
import javax.inject.Singleton;
@Singleton
public class MultipartNamingStrategy {
private static final String PART_SEPARATOR = "_";
protected String getPartName(String key, int partNumber, int totalParts) {
int base = (int) Math.log10(totalParts) + 1;
return String.format("%s%s%0" + base + "d", key, PART_SEPARATOR, partNumber);
}
}

View File

@ -1,36 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.googlecloudstorage.blobstore.strategy.internal;
public final class MultipartUpload {
private MultipartUpload() {
}
/* Maximum number of parts per upload */
public static final int MAX_NUMBER_OF_PARTS = 10000;
/* Maximum number of parts returned for a list parts request */
public static final int MAX_LIST_PARTS_RETURNED = 1000;
/* Maximum number of multipart uploads returned in a list multipart uploads request */
public static final int MAX_LIST_MPU_RETURNED = 1000;
/**
* part size 5 MB to 5 GB, last part can be < 5 MB
*/
public static final long MIN_PART_SIZE = 5L * 1024L * 1024L;
public static final long MAX_PART_SIZE = 5L * 1024L * 1024L * 1024L;
}

View File

@ -1,139 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.googlecloudstorage.blobstore.strategy.internal;
import javax.annotation.Resource;
import javax.inject.Named;
import org.jclouds.blobstore.reference.BlobStoreConstants;
import org.jclouds.logging.Logger;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
public class MultipartUploadSlicingAlgorithm {
@Resource
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
protected Logger logger = Logger.NULL;
@VisibleForTesting
static final long DEFAULT_PART_SIZE = 32 * 1024 * 1024;
@VisibleForTesting
static final int DEFAULT_MAGNITUDE_BASE = 100;
@Inject(optional = true)
@Named("jclouds.mpu.parts.size")
@VisibleForTesting
long defaultPartSize = DEFAULT_PART_SIZE;
@Inject(optional = true)
@Named("jclouds.mpu.parts.magnitude")
@VisibleForTesting
int magnitudeBase = DEFAULT_MAGNITUDE_BASE;
// calculated only once, but not from the constructor
private volatile int parts; // required number of parts with chunkSize
private volatile long chunkSize;
private volatile long remaining; // number of bytes remained for the last part
// sequentially updated values
private volatile int part;
private volatile long chunkOffset;
private volatile long copied;
@VisibleForTesting
protected long calculateChunkSize(long length) {
long unitPartSize = defaultPartSize; // first try with default part size
int parts = (int) (length / unitPartSize);
long partSize = unitPartSize;
int magnitude = parts / magnitudeBase;
if (magnitude > 0) {
partSize = magnitude * unitPartSize;
if (partSize > MultipartUpload.MAX_PART_SIZE) {
partSize = MultipartUpload.MAX_PART_SIZE;
unitPartSize = MultipartUpload.MAX_PART_SIZE;
}
parts = (int) (length / partSize);
if (parts * partSize < length) {
partSize = (magnitude + 1) * unitPartSize;
if (partSize > MultipartUpload.MAX_PART_SIZE) {
partSize = MultipartUpload.MAX_PART_SIZE;
unitPartSize = MultipartUpload.MAX_PART_SIZE;
}
parts = (int) (length / partSize);
}
}
if (parts > MultipartUpload.MAX_NUMBER_OF_PARTS) { // if splits in too many parts or
// cannot be split
unitPartSize = MultipartUpload.MIN_PART_SIZE; // take the minimum part size
parts = (int) (length / unitPartSize);
}
if (parts > MultipartUpload.MAX_NUMBER_OF_PARTS) { // if still splits in too many parts
parts = MultipartUpload.MAX_NUMBER_OF_PARTS - 1; // limit them. do not care about not
// covering
}
long remainder = length % unitPartSize;
if (remainder == 0 && parts > 0) {
parts -= 1;
}
this.chunkSize = partSize;
this.parts = parts;
this.remaining = length - partSize * parts;
logger.debug(" %d bytes partitioned in %d parts of part size: %d, remaining: %d%s", length, parts, chunkSize,
remaining, remaining > MultipartUpload.MAX_PART_SIZE ? " overflow!" : "");
return this.chunkSize;
}
public long getCopied() {
return copied;
}
public void setCopied(long copied) {
this.copied = copied;
}
@VisibleForTesting
protected int getParts() {
return parts;
}
protected int getNextPart() {
return ++part;
}
protected void addCopied(long copied) {
this.copied += copied;
}
protected long getNextChunkOffset() {
long next = chunkOffset;
chunkOffset += getChunkSize();
return next;
}
@VisibleForTesting
protected long getChunkSize() {
return chunkSize;
}
@VisibleForTesting
protected long getRemaining() {
return remaining;
}
}

View File

@ -1,26 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.googlecloudstorage.blobstore.strategy.internal;
import org.jclouds.blobstore.domain.Blob;
import com.google.inject.ImplementedBy;
@ImplementedBy(SequentialMultipartUploadStrategy.class)
public abstract class MultipartUploadStrategy {
public abstract String execute(String container, Blob blob);
}

View File

@ -1,100 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.googlecloudstorage.blobstore.strategy.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.List;
import javax.inject.Provider;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.BlobBuilder;
import org.jclouds.googlecloudstorage.GoogleCloudStorageApi;
import org.jclouds.googlecloudstorage.blobstore.functions.BlobMetadataToObjectTemplate;
import org.jclouds.googlecloudstorage.domain.GoogleCloudStorageObject;
import org.jclouds.googlecloudstorage.domain.templates.ComposeObjectTemplate;
import org.jclouds.googlecloudstorage.domain.templates.ObjectTemplate;
import org.jclouds.googlecloudstorage.options.InsertObjectOptions;
import org.jclouds.io.Payload;
import org.jclouds.io.PayloadSlicer;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
public final class SequentialMultipartUploadStrategy extends MultipartUploadStrategy {
private final GoogleCloudStorageApi api;
private final Provider<BlobBuilder> blobBuilders;
private final BlobMetadataToObjectTemplate blob2ObjectTemplate;
private final MultipartUploadSlicingAlgorithm algorithm;
private final PayloadSlicer slicer;
private final MultipartNamingStrategy namingStrategy;
@Inject SequentialMultipartUploadStrategy(GoogleCloudStorageApi api, Provider<BlobBuilder> blobBuilders,
BlobMetadataToObjectTemplate blob2ObjectTemplate, MultipartUploadSlicingAlgorithm algorithm,
PayloadSlicer slicer, MultipartNamingStrategy namingStrategy) {
this.api = api;
this.blobBuilders = blobBuilders;
this.blob2ObjectTemplate = blob2ObjectTemplate;
this.algorithm = algorithm;
this.slicer = slicer;
this.namingStrategy = namingStrategy;
}
@Override
public String execute(String container, Blob blob) {
ObjectTemplate destination = blob2ObjectTemplate.apply(blob.getMetadata());
List<GoogleCloudStorageObject> sourceList = Lists.newArrayList();
String key = blob.getMetadata().getName();
Payload payload = blob.getPayload();
Long length = payload.getContentMetadata().getContentLength();
if (length == null) {
length = blob.getMetadata().getContentMetadata().getContentLength();
payload.getContentMetadata().setContentLength(length);
}
checkNotNull(length,
"please invoke payload.getContentMetadata().setContentLength(length) prior to multipart upload");
long chunkSize = algorithm.calculateChunkSize(length);
int partCount = algorithm.getParts();
if (partCount > 0) {
for (Payload part : slicer.slice(payload, chunkSize)) {
int partNum = algorithm.getNextPart();
String partName = namingStrategy.getPartName(key, partNum, partCount);
long partSize = ((partCount + 1) == partNum) ? algorithm.getRemaining() : algorithm.getChunkSize();
InsertObjectOptions insertOptions = new InsertObjectOptions().name(partName);
GoogleCloudStorageObject object = api.getObjectApi().simpleUpload(container,
blob.getMetadata().getContentMetadata().getContentType(), partSize, part, insertOptions);
sourceList.add(object);
}
ComposeObjectTemplate template = ComposeObjectTemplate.builder().fromGoogleCloudStorageObject(sourceList)
.destination(destination).build();
return api.getObjectApi().composeObjects(container, key, template).etag();
} else {
return api.getObjectApi()
.multipartUpload(container, blob2ObjectTemplate.apply(blob.getMetadata()), blob.getPayload())
.etag();
}
}
}

View File

@ -38,7 +38,6 @@ import org.jclouds.blobstore.domain.StorageMetadata;
import org.jclouds.blobstore.integration.internal.BaseBlobIntegrationTest;
import org.jclouds.blobstore.options.PutOptions;
import org.jclouds.googlecloud.internal.TestProperties;
import org.jclouds.googlecloudstorage.blobstore.strategy.internal.MultipartUpload;
import org.jclouds.io.Payloads;
import org.jclouds.io.payloads.ByteSourcePayload;
import org.jclouds.utils.TestUtils;
@ -57,7 +56,7 @@ import com.google.common.io.Files;
@Test(groups = { "live", "blobstorelive" })
public class GoogleCloudStorageBlobIntegrationLiveTest extends BaseBlobIntegrationTest {
private long PART_SIZE = MultipartUpload.MIN_PART_SIZE;
private long PART_SIZE = 5L * 1024L * 1024L;
@Override
protected long getMinimumMultipartBlobSize() {
@ -93,27 +92,6 @@ public class GoogleCloudStorageBlobIntegrationLiveTest extends BaseBlobIntegrati
// not supported in object level.
}
@Override
@Test(groups = { "integration", "live" })
public void testMultipartUploadSinglePart() throws SkipException {
throw new SkipException("Implement MultipartUploads uploads");
// TODO: Implement MultipartUploads uploads
}
@Override
@Test(groups = { "integration", "live" })
public void testMultipartUploadMultipleParts() throws SkipException {
throw new SkipException("Implement MultipartUploads uploads");
// TODO: Implement MultipartUploads uploads
}
@Override
@Test(groups = { "integration", "live" })
public void testMultipartUploadNoPartsAbort() throws SkipException {
throw new SkipException("Implement MultipartUploads uploads");
// TODO: Implement MultipartUploads uploads
}
@Override
@Test(groups = { "integration", "live" }, dataProvider = "gcsPutTest")
public void testPutObject(String name, String type, Object content, Object realObject) throws InterruptedException,