JCLOUDS-457: BlobStore MultiPartUpload strategy

The code related to the MultiPartUpload strategy has been added.
MultiPart uploads use an upload strategy (e.g. sequential vs parallel)
and also a slicing strategy to split the payload in different parts.
This commit is contained in:
Roman Coedo 2014-07-19 01:47:14 +02:00 committed by Andrew Gaul
parent 1359ba9010
commit cdbb845ae7
9 changed files with 517 additions and 2 deletions

View File

@ -35,19 +35,24 @@ import org.jclouds.collect.Memoized;
import org.jclouds.crypto.Crypto;
import org.jclouds.domain.Location;
import org.jclouds.glacier.GlacierClient;
import org.jclouds.glacier.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.javax.annotation.Nullable;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.google.inject.Provider;
public class GlacierBlobStore extends BaseBlobStore {
private final GlacierClient sync;
private final Crypto crypto;
private final Provider<MultipartUploadStrategy> multipartUploadStrategy;
@Inject
GlacierBlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, GlacierClient sync, Crypto crypto) {
@Memoized Supplier<Set<? extends Location>> locations, GlacierClient sync, Crypto crypto,
Provider<MultipartUploadStrategy> multipartUploadStrategy) {
super(context, blobUtils, defaultLocation, locations);
this.multipartUploadStrategy = checkNotNull(multipartUploadStrategy, "multipartUploadStrategy");
this.sync = checkNotNull(sync, "sync");
this.crypto = checkNotNull(crypto, "crypto");
}
@ -95,7 +100,10 @@ public class GlacierBlobStore extends BaseBlobStore {
@Override
public String putBlob(String container, Blob blob, PutOptions options) {
throw new UnsupportedOperationException();
if (options.isMultipart()) {
return multipartUploadStrategy.get().execute(container, blob);
}
return putBlob(container, blob);
}
@Override

View File

@ -21,6 +21,10 @@ import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.attr.ConsistencyModel;
import org.jclouds.glacier.blobstore.GlacierAsyncBlobStore;
import org.jclouds.glacier.blobstore.GlacierBlobStore;
import org.jclouds.glacier.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.glacier.blobstore.strategy.SlicingStrategy;
import org.jclouds.glacier.blobstore.strategy.internal.BaseSlicingStrategy;
import org.jclouds.glacier.blobstore.strategy.internal.SequentialMultipartUploadStrategy;
import com.google.inject.AbstractModule;
@ -30,5 +34,7 @@ public class GlacierBlobStoreContextModule extends AbstractModule {
bind(ConsistencyModel.class).toInstance(ConsistencyModel.EVENTUAL);
bind(BlobStore.class).to(GlacierBlobStore.class);
bind(AsyncBlobStore.class).to(GlacierAsyncBlobStore.class);
bind(MultipartUploadStrategy.class).to(SequentialMultipartUploadStrategy.class);
bind(SlicingStrategy.class).to(BaseSlicingStrategy.class);
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.glacier.blobstore.strategy;
import org.jclouds.blobstore.domain.Blob;
public interface MultipartUploadStrategy {
String execute(String container, Blob blob);
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.glacier.blobstore.strategy;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import org.jclouds.glacier.util.ContentRange;
import org.jclouds.io.Payload;
public class PayloadSlice {
private final Payload payload;
private final ContentRange range;
private final int part;
public PayloadSlice(Payload payload, ContentRange range, int part) {
this.payload = checkNotNull(payload, "payload");
this.range = checkNotNull(range, "range");
checkArgument(part >= 0, "The part number cannot be negative");
this.part = part;
}
public Payload getPayload() {
return payload;
}
public ContentRange getRange() {
return range;
}
public int getPart() {
return part;
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.glacier.blobstore.strategy;
import org.jclouds.io.Payload;
public interface SlicingStrategy {
public static final int MAX_LIST_PARTS_RETURNED = 1000;
public static final int MAX_LIST_MPU_RETURNED = 1000;
public static final int MAX_NUMBER_OF_PARTS = 10000;
public static final long MIN_PART_SIZE = 1L << 20; //1 MB, last part can be < 1 MB
public static final long MAX_PART_SIZE = 1L << 32; //4 GB
void startSlicing(Payload payload);
PayloadSlice nextSlice();
boolean hasNext();
long getPartSizeInMB();
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.glacier.blobstore.strategy.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.Math.sqrt;
import org.jclouds.glacier.blobstore.strategy.PayloadSlice;
import org.jclouds.glacier.blobstore.strategy.SlicingStrategy;
import org.jclouds.glacier.util.ContentRange;
import org.jclouds.io.Payload;
import org.jclouds.io.PayloadSlicer;
import com.google.inject.Inject;
import com.google.inject.name.Named;
public class BaseSlicingStrategy implements SlicingStrategy {
public static final double DEFAULT_RATIO = 0.32; // (part size/number of parts) ratio
@Inject(optional = true)
@Named("jclouds.mpu.part.ratio")
private final double ratio = DEFAULT_RATIO;
private final PayloadSlicer slicer;
private Payload payload;
private volatile long partSizeInMB;
private volatile long total;
private volatile long copied;
private volatile int part;
@Inject
public BaseSlicingStrategy(PayloadSlicer slicer) {
this.slicer = checkNotNull(slicer, "slicer");
this.total = 0;
this.copied = 0;
this.partSizeInMB = 0;
this.part = 0;
}
protected long calculatePartSize(long length) {
long lengthInMB = (long) (length / (1L << 20)) + 1;
double fpPartSizeInMB = sqrt(ratio * lengthInMB); //Get the part size which matches the given ratio
long partSizeInMB = Long.highestOneBit((long) fpPartSizeInMB - 1) << 1;
if (partSizeInMB < 1) return 1;
else if (partSizeInMB > MAX_PART_SIZE) return MAX_PART_SIZE;
return partSizeInMB;
}
public long getRemaining() {
return total - copied;
}
@Override
public void startSlicing(Payload payload) {
this.payload = checkNotNull(payload, "payload");
this.copied = 0;
this.total = checkNotNull(payload.getContentMetadata().getContentLength(), "contentLength");
this.partSizeInMB = calculatePartSize(total);
this.part = 0;
}
@Override
public PayloadSlice nextSlice() {
checkNotNull(this.payload, "payload");
long sliceLength = Math.min(getRemaining(), partSizeInMB << 20);
Payload slicedPayload = slicer.slice(payload, copied, sliceLength);
ContentRange range = ContentRange.build(copied, copied + sliceLength - 1);
copied += sliceLength;
part++;
return new PayloadSlice(slicedPayload, range, part);
}
@Override
public boolean hasNext() {
return this.getRemaining() != 0;
}
@Override
public long getPartSizeInMB() {
return partSizeInMB;
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.glacier.blobstore.strategy.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.glacier.GlacierClient;
import org.jclouds.glacier.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.glacier.blobstore.strategy.PayloadSlice;
import org.jclouds.glacier.blobstore.strategy.SlicingStrategy;
import com.google.common.collect.ImmutableMap;
import com.google.common.hash.HashCode;
import com.google.inject.Inject;
public class SequentialMultipartUploadStrategy implements MultipartUploadStrategy {
private final GlacierClient client;
private final SlicingStrategy slicer;
@Inject
public SequentialMultipartUploadStrategy(GlacierClient client, SlicingStrategy slicer) {
this.client = checkNotNull(client, "client");
this.slicer = checkNotNull(slicer, "slicer");
}
@Override
public String execute(String container, Blob blob) {
slicer.startSlicing(blob.getPayload());
String uploadId = client.initiateMultipartUpload(container, slicer.getPartSizeInMB(),
blob.getMetadata().getName());
try {
ImmutableMap.Builder<Integer, HashCode> hashes = ImmutableMap.builder();
while (slicer.hasNext()) {
PayloadSlice slice = slicer.nextSlice();
hashes.put(slice.getPart(),
client.uploadPart(container, uploadId, slice.getRange(), slice.getPayload()));
}
return client.completeMultipartUpload(container, uploadId, hashes.build(),
blob.getPayload().getContentMetadata().getContentLength());
} catch (RuntimeException exception) {
client.abortMultipartUpload(container, uploadId);
throw exception;
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.glacier.blobstore.strategy.internal;
import static org.assertj.core.api.Assertions.assertThat;
import static org.jclouds.glacier.util.TestUtils.MiB;
import static org.jclouds.glacier.util.TestUtils.GiB;
import static org.jclouds.glacier.util.TestUtils.buildPayload;
import org.jclouds.glacier.blobstore.strategy.PayloadSlice;
import org.jclouds.glacier.util.ContentRange;
import org.jclouds.io.internal.BasePayloadSlicer;
import org.testng.annotations.Test;
@Test(groups = {"unit"})
public class BaseSlicingStrategyTest {
@Test
public void slicing100MBTest() {
BaseSlicingStrategy slicer = new BaseSlicingStrategy(new BasePayloadSlicer());
slicer.startSlicing(buildPayload(100 * MiB));
long offset = 0;
while (slicer.hasNext()) {
PayloadSlice slice = slicer.nextSlice();
long expectedLength = (slicer.hasNext() ? 8 : 4) * MiB;
assertThat(slice.getPayload().getContentMetadata().getContentLength()).isEqualTo(expectedLength);
assertThat(slice.getRange()).isEqualTo(ContentRange.build(offset, offset + expectedLength - 1));
offset += expectedLength;
}
}
@Test
public void slicing2000MBTest() {
BaseSlicingStrategy slicer = new BaseSlicingStrategy(new BasePayloadSlicer());
slicer.startSlicing(buildPayload(2000 * MiB));
long offset = 0;
while (slicer.hasNext()) {
PayloadSlice slice = slicer.nextSlice();
long expectedLength = (slicer.hasNext() ? 32 : 16) * MiB;
assertThat(slice.getPayload().getContentMetadata().getContentLength()).isEqualTo(expectedLength);
assertThat(slice.getRange()).isEqualTo(ContentRange.build(offset, offset + expectedLength - 1));
offset += expectedLength;
}
}
@Test
public void slicing2MBTest() {
BaseSlicingStrategy slicer = new BaseSlicingStrategy(new BasePayloadSlicer());
slicer.startSlicing(buildPayload(2 * MiB));
long offset = 0;
while (slicer.hasNext()) {
PayloadSlice slice = slicer.nextSlice();
long expectedLength = 1 * MiB;
assertThat(slice.getPayload().getContentMetadata().getContentLength()).isEqualTo(expectedLength);
assertThat(slice.getRange()).isEqualTo(ContentRange.build(offset, offset + expectedLength - 1));
offset += expectedLength;
}
}
@Test
public void slicing40000GBTest() {
BaseSlicingStrategy slicer = new BaseSlicingStrategy(new BasePayloadSlicer());
slicer.startSlicing(buildPayload(40000 * GiB));
long offset = 0;
while (slicer.hasNext()) {
PayloadSlice slice = slicer.nextSlice();
long expectedLength = 4096 * MiB;
assertThat(slice.getPayload().getContentMetadata().getContentLength()).isEqualTo(expectedLength);
assertThat(slice.getRange()).isEqualTo(ContentRange.build(offset, offset + expectedLength - 1));
offset += expectedLength;
}
}
}

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.glacier.blobstore.strategy.internal;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static org.assertj.core.api.Assertions.assertThat;
import static org.jclouds.Constants.PROPERTY_MAX_RETRIES;
import static org.jclouds.Constants.PROPERTY_SO_TIMEOUT;
import static org.jclouds.glacier.reference.GlacierHeaders.ARCHIVE_DESCRIPTION;
import static org.jclouds.glacier.reference.GlacierHeaders.ARCHIVE_ID;
import static org.jclouds.glacier.reference.GlacierHeaders.ARCHIVE_SIZE;
import static org.jclouds.glacier.reference.GlacierHeaders.MULTIPART_UPLOAD_ID;
import static org.jclouds.glacier.reference.GlacierHeaders.PART_SIZE;
import static org.jclouds.glacier.reference.GlacierHeaders.TREE_HASH;
import static org.jclouds.glacier.util.TestUtils.MiB;
import static org.jclouds.glacier.util.TestUtils.buildPayload;
import java.io.IOException;
import java.net.URL;
import java.util.Properties;
import java.util.Set;
import org.jclouds.ContextBuilder;
import org.jclouds.blobstore.domain.internal.BlobBuilderImpl;
import org.jclouds.concurrent.config.ExecutorServiceModule;
import org.jclouds.glacier.GlacierClient;
import org.jclouds.http.HttpResponseException;
import org.jclouds.io.internal.BasePayloadSlicer;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableSet;
import com.google.common.hash.HashCode;
import com.google.common.net.HttpHeaders;
import com.google.inject.Module;
import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.MockWebServer;
import com.squareup.okhttp.mockwebserver.RecordedRequest;
@Test(groups = {"mock"}, singleThreaded = true)
public class MultipartUploadStrategyMockTest {
private static final Set<Module> modules = ImmutableSet.<Module> of(new ExecutorServiceModule(sameThreadExecutor(),
sameThreadExecutor()));
private static HashCode hash8 = HashCode.fromString("c87a460c93d4a8ffcf09a9a236cc17a486d7ed8a1a2f48e9c361c5f7ac0f1280");
private static HashCode hash4 = HashCode.fromString("9491cb2ed1d4e7cd53215f4017c23ec4ad21d7050a1e6bb636c4f67e8cddb844");
private static HashCode hcomp = HashCode.fromString("e196b8ae66b4e55a10c84647957c1291c84ffafa44bfdb88d87f0456e5399e46");
MockWebServer server;
GlacierClient client;
private static GlacierClient getGlacierClient(URL server) {
Properties overrides = new Properties();
// prevent expect-100 bug http://code.google.com/p/mockwebserver/issues/detail?id=6
overrides.setProperty(PROPERTY_SO_TIMEOUT, "0");
overrides.setProperty(PROPERTY_MAX_RETRIES, "1");
return ContextBuilder.newBuilder("glacier").credentials("accessKey", "secretKey").endpoint(server.toString())
.modules(modules).overrides(overrides).buildApi(GlacierClient.class);
}
@BeforeMethod
private void initServer() throws IOException {
server = new MockWebServer();
server.play();
client = getGlacierClient(server.getUrl("/"));
}
@AfterMethod
private void shutdownServer() throws IOException {
server.shutdown();
}
@Test
public void testSequentialMPU() throws IOException, InterruptedException {
server.enqueue(new MockResponse().setResponseCode(201).addHeader(MULTIPART_UPLOAD_ID, "upload-id"));
for (int i = 0; i < 12; i++) {
server.enqueue(new MockResponse().setResponseCode(204).addHeader(TREE_HASH, hash8));
}
server.enqueue(new MockResponse().setResponseCode(204).addHeader(TREE_HASH, hash4));
server.enqueue(new MockResponse().setResponseCode(201).addHeader(ARCHIVE_ID, "archive-id"));
SequentialMultipartUploadStrategy strat = new SequentialMultipartUploadStrategy(client,
new BaseSlicingStrategy(new BasePayloadSlicer()));
assertThat(strat.execute("vault", new BlobBuilderImpl().name("test").payload(buildPayload(100 * MiB)).build()))
.isEqualTo("archive-id");
RecordedRequest initiate = server.takeRequest();
assertThat(initiate.getRequestLine()).isEqualTo("POST /-/vaults/vault/multipart-uploads HTTP/1.1");
assertThat(initiate.getHeader(ARCHIVE_DESCRIPTION)).isEqualTo("test");
assertThat(Long.parseLong(initiate.getHeader(PART_SIZE))).isEqualTo(8 * MiB);
RecordedRequest p1 = server.takeRequest();
assertThat(p1.getRequestLine())
.isEqualTo("PUT /-/vaults/vault/multipart-uploads/upload-id HTTP/1.1");
assertThat(Long.parseLong(p1.getHeader(HttpHeaders.CONTENT_LENGTH))).isEqualTo(8388608);
assertThat(HashCode.fromString(p1.getHeader(TREE_HASH))).isEqualTo(hash8);
for (int i = 0; i < 11; i++) {
server.takeRequest();
}
RecordedRequest p13 = server.takeRequest();
assertThat(p13.getRequestLine())
.isEqualTo("PUT /-/vaults/vault/multipart-uploads/upload-id HTTP/1.1");
assertThat(HashCode.fromString(p13.getHeader(TREE_HASH))).isEqualTo(hash4);
assertThat(Long.parseLong(p13.getHeader(HttpHeaders.CONTENT_LENGTH))).isEqualTo(4194304);
RecordedRequest complete = server.takeRequest();
assertThat(complete.getRequestLine()).isEqualTo("POST /-/vaults/vault/multipart-uploads/upload-id HTTP/1.1");
assertThat(HashCode.fromString(complete.getHeader(TREE_HASH))).isEqualTo(hcomp);
assertThat(Long.parseLong(complete.getHeader(ARCHIVE_SIZE))).isEqualTo(100 * MiB);
}
@Test(expectedExceptions = HttpResponseException.class)
public void testSequentialMPUAbort() throws InterruptedException {
server.enqueue(new MockResponse().setResponseCode(201).addHeader(MULTIPART_UPLOAD_ID, "upload-id"));
server.enqueue(new MockResponse().setResponseCode(204).addHeader(TREE_HASH, hash8));
server.enqueue(new MockResponse().setResponseCode(404));
server.enqueue(new MockResponse().setResponseCode(204));
SequentialMultipartUploadStrategy strat = new SequentialMultipartUploadStrategy(client,
new BaseSlicingStrategy(new BasePayloadSlicer()));
try {
strat.execute("vault", new BlobBuilderImpl().name("test").payload(buildPayload(100 * MiB)).build());
} finally {
server.takeRequest();
server.takeRequest();
server.takeRequest();
RecordedRequest abort = server.takeRequest();
assertThat(abort.getRequestLine()).isEqualTo("DELETE /-/vaults/vault/multipart-uploads/upload-id HTTP/1.1");
}
}
}