mirror of https://github.com/apache/jclouds.git
First working implementation of swift multipart upload. Async client TDB.
This commit is contained in:
parent
1d5c0ed5eb
commit
5f83e6af3d
|
@ -52,14 +52,7 @@ import org.jclouds.openstack.swift.functions.ParseObjectInfoFromHeaders;
|
|||
import org.jclouds.openstack.swift.functions.ParseObjectInfoListFromJsonResponse;
|
||||
import org.jclouds.openstack.swift.functions.ReturnTrueOn404FalseOn409;
|
||||
import org.jclouds.openstack.swift.options.ListContainerOptions;
|
||||
import org.jclouds.rest.annotations.BinderParam;
|
||||
import org.jclouds.rest.annotations.Endpoint;
|
||||
import org.jclouds.rest.annotations.ExceptionParser;
|
||||
import org.jclouds.rest.annotations.ParamParser;
|
||||
import org.jclouds.rest.annotations.QueryParams;
|
||||
import org.jclouds.rest.annotations.RequestFilters;
|
||||
import org.jclouds.rest.annotations.ResponseParser;
|
||||
import org.jclouds.rest.annotations.SkipEncoding;
|
||||
import org.jclouds.rest.annotations.*;
|
||||
import org.jclouds.rest.functions.ReturnVoidOnNotFoundOr404;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
@ -183,4 +176,11 @@ public interface CommonSwiftAsyncClient {
|
|||
@Path("/{container}/{name}")
|
||||
ListenableFuture<Void> removeObject(@PathParam("container") String container, @PathParam("name") String name);
|
||||
|
||||
@PUT
|
||||
@Path("/{container}/{name}")
|
||||
@ResponseParser(ParseETagHeader.class)
|
||||
@Headers(keys = "X-Object-Manifest", values="{container}/{name}")
|
||||
ListenableFuture<String> putObjectManifest(@PathParam("container") String container,
|
||||
@PathParam("name") String name);
|
||||
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.jclouds.blobstore.ContainerNotFoundException;
|
|||
import org.jclouds.blobstore.domain.PageSet;
|
||||
import org.jclouds.concurrent.Timeout;
|
||||
import org.jclouds.http.options.GetOptions;
|
||||
import org.jclouds.io.Payload;
|
||||
import org.jclouds.openstack.swift.domain.AccountMetadata;
|
||||
import org.jclouds.openstack.swift.domain.ContainerMetadata;
|
||||
import org.jclouds.openstack.swift.domain.MutableObjectInfoWithMetadata;
|
||||
|
@ -113,4 +114,5 @@ public interface CommonSwiftClient {
|
|||
*/
|
||||
boolean objectExists(String container, String name);
|
||||
|
||||
String putObjectManifest(String container, String name);
|
||||
}
|
||||
|
|
|
@ -55,6 +55,7 @@ import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceList;
|
|||
import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata;
|
||||
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob;
|
||||
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata;
|
||||
import org.jclouds.openstack.swift.blobstore.strategy.internal.MultipartUploadStrategy;
|
||||
import org.jclouds.openstack.swift.domain.ContainerMetadata;
|
||||
import org.jclouds.openstack.swift.domain.MutableObjectInfoWithMetadata;
|
||||
import org.jclouds.openstack.swift.domain.ObjectInfo;
|
||||
|
@ -81,6 +82,7 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
|
|||
private final ObjectToBlobMetadata object2BlobMd;
|
||||
private final BlobToHttpGetOptions blob2ObjectGetOptions;
|
||||
private final Provider<FetchBlobMetadata> fetchBlobMetadataProvider;
|
||||
//private final Provider<MultipartUploadStrategy> multipartUploadStrategy;
|
||||
|
||||
@Inject
|
||||
protected SwiftAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
|
||||
|
@ -102,6 +104,7 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
|
|||
this.object2BlobMd = object2BlobMd;
|
||||
this.blob2ObjectGetOptions = blob2ObjectGetOptions;
|
||||
this.fetchBlobMetadataProvider = checkNotNull(fetchBlobMetadataProvider, "fetchBlobMetadataProvider");
|
||||
//this.multipartUploadStrategy = multipartUploadStrategy;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -239,7 +242,11 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
|
|||
@Override
|
||||
public ListenableFuture<String> putBlob(String container, Blob blob, PutOptions options) {
|
||||
// TODO implement options
|
||||
return putBlob(container, blob);
|
||||
//if (options.isMultipart()) {
|
||||
// return null; //Lis multipartUploadStrategy.get().execute(container, blob, options);
|
||||
//} else {
|
||||
return putBlob(container, blob);
|
||||
//}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -212,7 +212,7 @@ public class SwiftBlobStore extends BaseBlobStore {
|
|||
public String putBlob(String container, Blob blob, PutOptions options) {
|
||||
// TODO implement options
|
||||
if (options.isMultipart()) {
|
||||
return multipartUploadStrategy.get().execute(container, blob, options);
|
||||
return multipartUploadStrategy.get().execute(container, blob, options, blob2Object);
|
||||
} else {
|
||||
return putBlob(container, blob);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,152 @@
|
|||
/**
|
||||
* Licensed to jclouds, Inc. (jclouds) under one or more
|
||||
* contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. jclouds licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
/*
|
||||
* MultipartUploadSlicingAlgorithm.java
|
||||
*
|
||||
*
|
||||
* Created by: tibor
|
||||
*
|
||||
* History
|
||||
*
|
||||
*/
|
||||
|
||||
package org.jclouds.openstack.swift.blobstore.strategy.internal;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.inject.Inject;
|
||||
import org.jclouds.blobstore.reference.BlobStoreConstants;
|
||||
import org.jclouds.logging.Logger;
|
||||
import org.jclouds.openstack.swift.blobstore.strategy.MultipartUpload;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.inject.Named;
|
||||
|
||||
public class MultipartUploadSlicingAlgorithm {
|
||||
|
||||
@Resource
|
||||
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
||||
protected Logger logger = Logger.NULL;
|
||||
|
||||
@VisibleForTesting
|
||||
static final long DEFAULT_PART_SIZE = 33554432; // 32MB
|
||||
|
||||
@VisibleForTesting
|
||||
static final int DEFAULT_MAGNITUDE_BASE = 100;
|
||||
|
||||
@Inject(optional = true)
|
||||
@Named("jclouds.mpu.parts.size")
|
||||
@VisibleForTesting
|
||||
long defaultPartSize = DEFAULT_PART_SIZE;
|
||||
|
||||
@Inject(optional = true)
|
||||
@Named("jclouds.mpu.parts.magnitude")
|
||||
@VisibleForTesting
|
||||
int magnitudeBase = DEFAULT_MAGNITUDE_BASE;
|
||||
|
||||
// calculated only once, but not from the constructor
|
||||
private volatile int parts; // required number of parts with chunkSize
|
||||
private volatile long chunkSize;
|
||||
private volatile long remaining; // number of bytes remained for the last part
|
||||
|
||||
// sequentially updated values
|
||||
private volatile int part;
|
||||
private volatile long chunkOffset;
|
||||
private volatile long copied;
|
||||
|
||||
@VisibleForTesting
|
||||
protected long calculateChunkSize(long length) {
|
||||
long unitPartSize = defaultPartSize; // first try with default part size
|
||||
int parts = (int)(length / unitPartSize);
|
||||
long partSize = unitPartSize;
|
||||
int magnitude = (int) (parts / magnitudeBase);
|
||||
if (magnitude > 0) {
|
||||
partSize = magnitude * unitPartSize;
|
||||
if (partSize > MultipartUpload.MAX_PART_SIZE) {
|
||||
partSize = MultipartUpload.MAX_PART_SIZE;
|
||||
unitPartSize = MultipartUpload.MAX_PART_SIZE;
|
||||
}
|
||||
parts = (int)(length / partSize);
|
||||
if (parts * partSize < length) {
|
||||
partSize = (magnitude + 1) * unitPartSize;
|
||||
if (partSize > MultipartUpload.MAX_PART_SIZE) {
|
||||
partSize = MultipartUpload.MAX_PART_SIZE;
|
||||
unitPartSize = MultipartUpload.MAX_PART_SIZE;
|
||||
}
|
||||
parts = (int)(length / partSize);
|
||||
}
|
||||
}
|
||||
if (parts > MultipartUpload.MAX_NUMBER_OF_PARTS) { // if splits in too many parts or
|
||||
// cannot be split
|
||||
unitPartSize = MultipartUpload.MIN_PART_SIZE; // take the minimum part size
|
||||
parts = (int)(length / unitPartSize);
|
||||
}
|
||||
if (parts > MultipartUpload.MAX_NUMBER_OF_PARTS) { // if still splits in too many parts
|
||||
parts = MultipartUpload.MAX_NUMBER_OF_PARTS - 1; // limit them. do not care about not
|
||||
// covering
|
||||
}
|
||||
long remainder = length % unitPartSize;
|
||||
if (remainder == 0 && parts > 0) {
|
||||
parts -= 1;
|
||||
}
|
||||
this.chunkSize = partSize;
|
||||
this.parts = parts;
|
||||
this.remaining = length - partSize * parts;
|
||||
logger.debug(" %d bytes partitioned in %d parts of part size: %d, remaining: %d%s", length, parts, chunkSize,
|
||||
remaining, (remaining > MultipartUpload.MAX_PART_SIZE ? " overflow!" : ""));
|
||||
return this.chunkSize;
|
||||
}
|
||||
|
||||
public long getCopied() {
|
||||
return copied;
|
||||
}
|
||||
|
||||
public void setCopied(long copied) {
|
||||
this.copied = copied;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected int getParts() {
|
||||
return parts;
|
||||
}
|
||||
|
||||
protected int getNextPart() {
|
||||
return ++part;
|
||||
}
|
||||
|
||||
protected void addCopied(long copied) {
|
||||
this.copied += copied;
|
||||
}
|
||||
|
||||
protected long getNextChunkOffset() {
|
||||
long next = chunkOffset;
|
||||
chunkOffset += getChunkSize();
|
||||
return next;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected long getChunkSize() {
|
||||
return chunkSize;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected long getRemaining() {
|
||||
return remaining;
|
||||
}
|
||||
|
||||
}
|
|
@ -3,10 +3,11 @@ package org.jclouds.openstack.swift.blobstore.strategy.internal;
|
|||
import com.google.inject.ImplementedBy;
|
||||
import org.jclouds.blobstore.domain.Blob;
|
||||
import org.jclouds.blobstore.options.PutOptions;
|
||||
import org.jclouds.openstack.swift.blobstore.functions.BlobToObject;
|
||||
import org.jclouds.openstack.swift.blobstore.strategy.MultipartUpload;
|
||||
|
||||
@ImplementedBy(SequentialMultipartUploadStrategy.class)
|
||||
public interface MultipartUploadStrategy extends MultipartUpload {
|
||||
|
||||
String execute(String container, Blob blob, PutOptions options);
|
||||
String execute(String container, Blob blob, PutOptions options, BlobToObject blob2Object);
|
||||
}
|
||||
|
|
|
@ -3,44 +3,91 @@ package org.jclouds.openstack.swift.blobstore.strategy.internal;
|
|||
import javax.annotation.Resource;
|
||||
import javax.inject.Named;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.inject.Inject;
|
||||
import org.jclouds.blobstore.KeyNotFoundException;
|
||||
import org.jclouds.blobstore.domain.Blob;
|
||||
import org.jclouds.blobstore.options.PutOptions;
|
||||
import org.jclouds.blobstore.reference.BlobStoreConstants;
|
||||
import org.jclouds.io.Payload;
|
||||
import org.jclouds.io.PayloadSlicer;
|
||||
import org.jclouds.logging.Logger;
|
||||
import org.jclouds.openstack.swift.SwiftClient;
|
||||
import org.jclouds.openstack.swift.blobstore.SwiftBlobStore;
|
||||
import org.jclouds.openstack.swift.blobstore.functions.BlobToObject;
|
||||
import org.jclouds.openstack.swift.domain.SwiftObject;
|
||||
import org.jclouds.util.Throwables2;
|
||||
|
||||
import java.util.SortedMap;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
|
||||
public class SequentialMultipartUploadStrategy implements MultipartUploadStrategy {
|
||||
@Resource
|
||||
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
||||
protected Logger logger = Logger.NULL;
|
||||
public static final String PART_SEPARATOR = "/";
|
||||
|
||||
protected final SwiftBlobStore ablobstore;
|
||||
protected final PayloadSlicer slicer;
|
||||
@Resource
|
||||
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
||||
protected Logger logger = Logger.NULL;
|
||||
|
||||
protected final SwiftBlobStore ablobstore;
|
||||
protected final PayloadSlicer slicer;
|
||||
|
||||
@Inject
|
||||
public SequentialMultipartUploadStrategy(SwiftBlobStore ablobstore, PayloadSlicer slicer) {
|
||||
this.ablobstore = checkNotNull(ablobstore, "ablobstore");
|
||||
this.slicer = checkNotNull(slicer, "slicer");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String execute(String container, Blob blob, PutOptions options) {
|
||||
public String execute(String container, Blob blob, PutOptions options, BlobToObject blob2Object) {
|
||||
System.out.println("here we go");
|
||||
String key = blob.getMetadata().getName();
|
||||
Payload payload = blob.getPayload();
|
||||
/*MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm();
|
||||
MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm();
|
||||
algorithm
|
||||
.calculateChunkSize(checkNotNull(
|
||||
payload.getContentMetadata().getContentLength(),
|
||||
"contentLength required on all uploads to amazon s3; please invoke payload.getContentMetadata().setContentLength(length) first"));
|
||||
"contentLength required on all uploads to swift; please invoke payload.getContentMetadata().setContentLength(length) first"));
|
||||
int parts = algorithm.getParts();
|
||||
long chunkSize = algorithm.getChunkSize();
|
||||
if (parts > 0) {
|
||||
SwiftClient client = (SwiftClient) ablobstore.getContext()
|
||||
.getProviderSpecificContext().getApi();
|
||||
|
||||
} */
|
||||
try {
|
||||
SortedMap<Integer, String> etags = Maps.newTreeMap();
|
||||
int part;
|
||||
while ((part = algorithm.getNextPart()) <= parts) {
|
||||
System.out.println("Uploading part " + part);
|
||||
Payload chunkedPart = slicer.slice(payload,
|
||||
algorithm.getNextChunkOffset(), chunkSize);
|
||||
Blob blobPart = ablobstore.blobBuilder(blob.getMetadata().getName() + PART_SEPARATOR +
|
||||
String.valueOf(part)).payload(chunkedPart).contentDisposition(
|
||||
blob.getMetadata().getName() + PART_SEPARATOR + String.valueOf(part)).build();
|
||||
client.putObject(container, blob2Object.apply(blobPart));
|
||||
}
|
||||
long remaining = algorithm.getRemaining();
|
||||
if (remaining > 0) {
|
||||
System.out.println("Uploading tail.");
|
||||
Payload chunkedPart = slicer.slice(payload,
|
||||
algorithm.getNextChunkOffset(), remaining);
|
||||
Blob blobPart = ablobstore.blobBuilder(blob.getMetadata().getName() + PART_SEPARATOR +
|
||||
String.valueOf(part)).payload(chunkedPart).contentDisposition(
|
||||
blob.getMetadata().getName() + PART_SEPARATOR + String.valueOf(part)).build();
|
||||
client.putObject(container, blob2Object.apply(blobPart));
|
||||
}
|
||||
return client.putObjectManifest(container, key);
|
||||
} catch (Exception ex) {
|
||||
RuntimeException rtex = Throwables2.getFirstThrowableOfType(ex, RuntimeException.class);
|
||||
if (rtex == null) {
|
||||
rtex = new RuntimeException(ex);
|
||||
}
|
||||
//client.abortMultipartUpload(container, key, uploadId);
|
||||
throw rtex;
|
||||
}
|
||||
|
||||
}
|
||||
return "NOT IMPLEMENTED";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService;
|
|||
import javax.inject.Inject;
|
||||
import javax.inject.Named;
|
||||
import javax.inject.Singleton;
|
||||
import javax.ws.rs.PathParam;
|
||||
|
||||
import org.jclouds.Constants;
|
||||
import org.jclouds.blobstore.TransientAsyncBlobStore;
|
||||
|
@ -167,7 +168,12 @@ public class StubSwiftAsyncClient implements CommonSwiftAsyncClient {
|
|||
return blobStore.removeBlob(container, key);
|
||||
}
|
||||
|
||||
public ListenableFuture<Boolean> setObjectInfo(String container, String key, Map<String, String> userMetadata) {
|
||||
@Override
|
||||
public ListenableFuture<String> putObjectManifest(String container, String name) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public ListenableFuture<Boolean> setObjectInfo(String container, String key, Map<String, String> userMetadata) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
|
@ -179,6 +185,11 @@ public class StubSwiftAsyncClient implements CommonSwiftAsyncClient {
|
|||
return objectProvider.create(null);
|
||||
}
|
||||
|
||||
|
||||
/*public String putObjectManifest(String container, String name) {
|
||||
return "stub";
|
||||
} */
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Boolean> objectExists(String bucketName, String key) {
|
||||
return immediateFuture(containerToBlobs.get(bucketName).containsKey(key));
|
||||
|
|
|
@ -56,6 +56,7 @@ public interface AWSS3Client extends S3Client {
|
|||
*/
|
||||
String initiateMultipartUpload(String bucketName, ObjectMetadata objectMetadata, PutObjectOptions... options);
|
||||
|
||||
|
||||
/**
|
||||
* This operation aborts a multipart upload. After a multipart upload is aborted, no additional
|
||||
* parts can be uploaded using that upload ID. The storage consumed by any previously uploaded
|
||||
|
|
Loading…
Reference in New Issue