JCLOUDS-894: Add portable multipart upload for Azure

This commit is contained in:
Andrew Gaul 2015-06-01 20:08:38 -07:00
parent f14514c068
commit 794b385c98
5 changed files with 6 additions and 277 deletions

View File

@ -26,7 +26,6 @@ import java.util.Set;
import java.util.UUID; import java.util.UUID;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Provider;
import javax.inject.Singleton; import javax.inject.Singleton;
import org.jclouds.azure.storage.domain.BoundedSet; import org.jclouds.azure.storage.domain.BoundedSet;
@ -37,7 +36,6 @@ import org.jclouds.azureblob.blobstore.functions.BlobToAzureBlob;
import org.jclouds.azureblob.blobstore.functions.ContainerToResourceMetadata; import org.jclouds.azureblob.blobstore.functions.ContainerToResourceMetadata;
import org.jclouds.azureblob.blobstore.functions.ListBlobsResponseToResourceList; import org.jclouds.azureblob.blobstore.functions.ListBlobsResponseToResourceList;
import org.jclouds.azureblob.blobstore.functions.ListOptionsToListBlobsOptions; import org.jclouds.azureblob.blobstore.functions.ListOptionsToListBlobsOptions;
import org.jclouds.azureblob.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.azureblob.domain.AzureBlob; import org.jclouds.azureblob.domain.AzureBlob;
import org.jclouds.azureblob.domain.BlobBlockProperties; import org.jclouds.azureblob.domain.BlobBlockProperties;
import org.jclouds.azureblob.domain.ContainerProperties; import org.jclouds.azureblob.domain.ContainerProperties;
@ -68,6 +66,7 @@ import org.jclouds.domain.Location;
import org.jclouds.http.options.GetOptions; import org.jclouds.http.options.GetOptions;
import org.jclouds.io.ContentMetadata; import org.jclouds.io.ContentMetadata;
import org.jclouds.io.MutableContentMetadata; import org.jclouds.io.MutableContentMetadata;
import org.jclouds.io.PayloadSlicer;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Optional; import com.google.common.base.Optional;
@ -89,18 +88,17 @@ public class AzureBlobStore extends BaseBlobStore {
private final BlobToAzureBlob blob2AzureBlob; private final BlobToAzureBlob blob2AzureBlob;
private final BlobPropertiesToBlobMetadata blob2BlobMd; private final BlobPropertiesToBlobMetadata blob2BlobMd;
private final BlobToHttpGetOptions blob2ObjectGetOptions; private final BlobToHttpGetOptions blob2ObjectGetOptions;
private final Provider<MultipartUploadStrategy> multipartUploadStrategy;
@Inject @Inject
AzureBlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation, AzureBlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, AzureBlobClient sync, @Memoized Supplier<Set<? extends Location>> locations, PayloadSlicer slicer, AzureBlobClient sync,
ContainerToResourceMetadata container2ResourceMd, ContainerToResourceMetadata container2ResourceMd,
ListOptionsToListBlobsOptions blobStore2AzureContainerListOptions, ListOptionsToListBlobsOptions blobStore2AzureContainerListOptions,
ListBlobsResponseToResourceList azure2BlobStoreResourceList, AzureBlobToBlob azureBlob2Blob, ListBlobsResponseToResourceList azure2BlobStoreResourceList, AzureBlobToBlob azureBlob2Blob,
BlobToAzureBlob blob2AzureBlob, BlobPropertiesToBlobMetadata blob2BlobMd, BlobToAzureBlob blob2AzureBlob, BlobPropertiesToBlobMetadata blob2BlobMd,
BlobToHttpGetOptions blob2ObjectGetOptions, Provider<MultipartUploadStrategy> multipartUploadStrategy) { BlobToHttpGetOptions blob2ObjectGetOptions) {
super(context, blobUtils, defaultLocation, locations); super(context, blobUtils, defaultLocation, locations, slicer);
this.sync = checkNotNull(sync, "sync"); this.sync = checkNotNull(sync, "sync");
this.container2ResourceMd = checkNotNull(container2ResourceMd, "container2ResourceMd"); this.container2ResourceMd = checkNotNull(container2ResourceMd, "container2ResourceMd");
this.blobStore2AzureContainerListOptions = checkNotNull(blobStore2AzureContainerListOptions, this.blobStore2AzureContainerListOptions = checkNotNull(blobStore2AzureContainerListOptions,
@ -110,7 +108,6 @@ public class AzureBlobStore extends BaseBlobStore {
this.blob2AzureBlob = checkNotNull(blob2AzureBlob, "blob2AzureBlob"); this.blob2AzureBlob = checkNotNull(blob2AzureBlob, "blob2AzureBlob");
this.blob2BlobMd = checkNotNull(blob2BlobMd, "blob2BlobMd"); this.blob2BlobMd = checkNotNull(blob2BlobMd, "blob2BlobMd");
this.blob2ObjectGetOptions = checkNotNull(blob2ObjectGetOptions, "blob2ObjectGetOptions"); this.blob2ObjectGetOptions = checkNotNull(blob2ObjectGetOptions, "blob2ObjectGetOptions");
this.multipartUploadStrategy = checkNotNull(multipartUploadStrategy, "multipartUploadStrategy");
} }
/** /**
@ -227,7 +224,7 @@ public class AzureBlobStore extends BaseBlobStore {
@Override @Override
public String putBlob(String container, Blob blob, PutOptions options) { public String putBlob(String container, Blob blob, PutOptions options) {
if (options.isMultipart()) { if (options.isMultipart()) {
return multipartUploadStrategy.get().execute(container, blob); return putMultipartBlob(container, blob, options);
} }
return putBlob(container, blob); return putBlob(container, blob);
} }

View File

@ -1,82 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.azureblob.blobstore.strategy;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.inject.Inject;
import org.jclouds.azureblob.AzureBlobClient;
import org.jclouds.azureblob.blobstore.functions.BlobToAzureBlob;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.reference.BlobStoreConstants;
import org.jclouds.io.Payload;
import org.jclouds.io.PayloadSlicer;
import org.jclouds.logging.Logger;
import javax.annotation.Resource;
import javax.inject.Named;
import java.util.List;
import java.util.UUID;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
/**
* Decomposes a blob into blocks for upload and assembly through PutBlock and PutBlockList
*/
public class AzureBlobBlockUploadStrategy implements MultipartUploadStrategy {
@Resource
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
private Logger logger = Logger.NULL;
private final AzureBlobClient client;
private final PayloadSlicer slicer;
private final BlobToAzureBlob blobToAzureBlob;
@Inject
AzureBlobBlockUploadStrategy(AzureBlobClient client, PayloadSlicer slicer, BlobToAzureBlob blobToAzureBlob) {
this.client = client;
this.slicer = slicer;
this.blobToAzureBlob = blobToAzureBlob;
}
@Override
public String execute(String container, Blob blob) {
String blobName = blob.getMetadata().getName();
Payload payload = blob.getPayload();
Long length = payload.getContentMetadata().getContentLength();
checkNotNull(length,
"please invoke payload.getContentMetadata().setContentLength(length) prior to azure block upload");
checkArgument(length <= (MAX_NUMBER_OF_BLOCKS * MAX_BLOCK_SIZE));
List<String> blockIds = Lists.newArrayList();
long bytesWritten = 0;
for (Payload block : slicer.slice(payload, MAX_BLOCK_SIZE)) {
String blockName = blobName + "-" + UUID.randomUUID().toString();
byte blockIdBytes[] = Hashing.md5().hashBytes(blockName.getBytes()).asBytes();
String blockId = BaseEncoding.base64().encode(blockIdBytes);
blockIds.add(blockId);
client.putBlock(container, blobName, blockId, block);
bytesWritten += block.getContentMetadata().getContentLength();
}
checkState(bytesWritten == length, "Wrote %s bytes, but we wanted to write %s bytes", bytesWritten, length);
return client.putBlockList(container, blobToAzureBlob.apply(blob), blockIds);
}
}

View File

@ -1,34 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.azureblob.blobstore.strategy;
import com.google.inject.ImplementedBy;
import org.jclouds.blobstore.domain.Blob;
/**
* @see <a href="http://msdn.microsoft.com/en-us/library/windowsazure/dd135726.aspx">Azure Put Block Documentation</a>
*/
@ImplementedBy(AzureBlobBlockUploadStrategy.class)
public interface MultipartUploadStrategy {
/* Maximum number of blocks per upload */
int MAX_NUMBER_OF_BLOCKS = 50000;
/* Maximum block size */
long MAX_BLOCK_SIZE = 4L * 1024 * 1024;
String execute(String container, Blob blob);
}

View File

@ -16,28 +16,18 @@
*/ */
package org.jclouds.azureblob.blobstore.integration; package org.jclouds.azureblob.blobstore.integration;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import org.jclouds.azureblob.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.integration.internal.BaseBlobIntegrationTest; import org.jclouds.blobstore.integration.internal.BaseBlobIntegrationTest;
import org.jclouds.blobstore.options.PutOptions;
import org.jclouds.utils.TestUtils;
import org.testng.SkipException; import org.testng.SkipException;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@Test(groups = "live") @Test(groups = "live")
public class AzureBlobIntegrationLiveTest extends BaseBlobIntegrationTest { public class AzureBlobIntegrationLiveTest extends BaseBlobIntegrationTest {
@Override @Override
protected long getMinimumMultipartBlobSize() { protected long getMinimumMultipartBlobSize() {
return MultipartUploadStrategy.MAX_BLOCK_SIZE + 1; return view.getBlobStore().getMaximumMultipartPartSize() + 1;
} }
public AzureBlobIntegrationLiveTest() { public AzureBlobIntegrationLiveTest() {
@ -64,23 +54,4 @@ public class AzureBlobIntegrationLiveTest extends BaseBlobIntegrationTest {
public void testSetBlobAccess() throws Exception { public void testSetBlobAccess() throws Exception {
throw new SkipException("unsupported in Azure"); throw new SkipException("unsupported in Azure");
} }
public void testMultipartChunkedFileStreamPowerOfTwoSize() throws IOException, InterruptedException {
final long limit = MultipartUploadStrategy.MAX_BLOCK_SIZE;
ByteSource input = TestUtils.randomByteSource().slice(0, limit);
File file = new File("target/const.txt");
input.copyTo(Files.asByteSink(file));
String containerName = getContainerName();
try {
BlobStore blobStore = view.getBlobStore();
blobStore.createContainerInLocation(null, containerName);
Blob blob = blobStore.blobBuilder("const.txt").payload(file).build();
String expected = blobStore.putBlob(containerName, blob, PutOptions.Builder.multipart());
String etag = blobStore.blobMetadata(containerName, "const.txt").getETag();
assertEquals(etag, expected);
} finally {
returnContainer(containerName);
}
}
} }

View File

@ -1,123 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.azureblob.blobstore.strategy;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSource;
import org.easymock.EasyMock;
import org.jclouds.azureblob.AzureBlobClient;
import org.jclouds.azureblob.blobstore.functions.BlobToAzureBlob;
import org.jclouds.azureblob.domain.AzureBlob;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.MutableBlobMetadata;
import org.jclouds.blobstore.domain.internal.BlobImpl;
import org.jclouds.blobstore.domain.internal.MutableBlobMetadataImpl;
import org.jclouds.io.MutableContentMetadata;
import org.jclouds.io.Payload;
import org.jclouds.io.PayloadSlicer;
import org.jclouds.io.Payloads;
import org.jclouds.io.payloads.BaseMutableContentMetadata;
import org.jclouds.io.payloads.ByteSourcePayload;
import org.testng.annotations.Test;
import java.util.List;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.testng.Assert.assertEquals;
@Test(groups = "unit", testName = "AzureBlobBlockUploadStrategyTest")
public class AzureBlobBlockUploadStrategyTest {
public void testExecute() throws Exception {
String container = "test-container";
String blobName = "test-blob";
byte[] blobData = "ABCD".getBytes(Charsets.UTF_8);
AzureBlobClient client = createMock(AzureBlobClient.class);
PayloadSlicer slicer = createMock(PayloadSlicer.class);
BlobToAzureBlob blobToAzureBlob = createMock(BlobToAzureBlob.class);
MutableBlobMetadata metadata = new MutableBlobMetadataImpl();
MutableContentMetadata contentMetadata = new BaseMutableContentMetadata();
contentMetadata.setContentLength((long)blobData.length);
metadata.setName(blobName);
metadata.setContentMetadata(contentMetadata);
Blob blob = new BlobImpl(metadata);
ByteSource bytes = ByteSource.wrap(blobData);
Payload payload = Payloads.newByteSourcePayload(bytes);
payload.setContentMetadata(contentMetadata);
blob.setPayload(payload);
List<Payload> payloads = ImmutableList.of(
createBlockPayload(new byte[]{blobData[0]}),
createBlockPayload(new byte[]{blobData[1]}),
createBlockPayload(new byte[]{blobData[2]}),
createBlockPayload(new byte[]{blobData[3]}));
expect(slicer.slice(payload, MultipartUploadStrategy.MAX_BLOCK_SIZE)).andReturn(payloads);
client.putBlock(eq(container), eq(blobName), anyObject(String.class), eq(payloads.get(0)));
client.putBlock(eq(container), eq(blobName), anyObject(String.class), eq(payloads.get(1)));
client.putBlock(eq(container), eq(blobName), anyObject(String.class), eq(payloads.get(2)));
client.putBlock(eq(container), eq(blobName), anyObject(String.class), eq(payloads.get(3)));
expect(client.putBlockList(eq(container), anyObject(AzureBlob.class), EasyMock.<List<String>>anyObject())).andReturn("Fake ETAG");
AzureBlobBlockUploadStrategy strat = new AzureBlobBlockUploadStrategy(client, slicer, blobToAzureBlob);
replay(slicer, client);
String etag = strat.execute(container, blob);
assertEquals(etag, "Fake ETAG");
verify(client);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testExceededContentLengthLimit() throws Exception {
String container = "test-container";
String blobName = "test-blob";
AzureBlobClient client = createNiceMock(AzureBlobClient.class);
PayloadSlicer slicer = createNiceMock(PayloadSlicer.class);
BlobToAzureBlob blobToAzureBlob = createMock(BlobToAzureBlob.class);
MutableBlobMetadata metadata = new MutableBlobMetadataImpl();
MutableContentMetadata contentMetadata = new BaseMutableContentMetadata();
contentMetadata.setContentLength(MultipartUploadStrategy.MAX_BLOCK_SIZE * MultipartUploadStrategy.MAX_NUMBER_OF_BLOCKS + 1);
metadata.setName(blobName);
metadata.setContentMetadata(contentMetadata);
Blob blob = new BlobImpl(metadata);
ByteSource bytes = ByteSource.wrap("ABCD".getBytes(Charsets.UTF_8));
Payload payload = Payloads.newByteSourcePayload(bytes);
payload.setContentMetadata(contentMetadata);
blob.setPayload(payload);
AzureBlobBlockUploadStrategy strat = new AzureBlobBlockUploadStrategy(client, slicer, blobToAzureBlob);
strat.execute(container, blob);
}
private Payload createBlockPayload(byte[] blockData) {
ByteSourcePayload payload = Payloads.newByteSourcePayload(ByteSource.wrap(blockData));
MutableContentMetadata contentMetadata = new BaseMutableContentMetadata();
contentMetadata.setContentLength((long) blockData.length);
payload.setContentMetadata(contentMetadata);
return payload;
}
}