HDDS-987. MultipartUpload: S3API for list parts of a object. Contributed by Bharat Viswanadham.

This commit is contained in:
Márton Elek 2019-02-05 12:45:43 +01:00
parent f365957c63
commit 9ace37b119
8 changed files with 510 additions and 6 deletions

View File

@ -47,6 +47,10 @@ public class OzoneMultipartUploadPartListParts {
this.truncated = truncate;
}
public void addAllParts(List<PartInfo> partInfos) {
partInfoList.addAll(partInfos);
}
public void addPart(PartInfo partInfo) {
this.partInfoList.add(partInfo);
}

View File

@ -160,4 +160,48 @@ Test abort Multipart upload with invalid uploadId
Upload part with Incorrect uploadID
Execute echo "Multipart upload" > /tmp/testfile
${result} = Execute AWSS3APICli and checkrc upload-part --bucket ${BUCKET} --key multipartKey --part-number 1 --body /tmp/testfile --upload-id "random" 255
Should contain ${result} NoSuchUpload
Should contain ${result} NoSuchUpload
Test list parts
#initiate multipart upload
${result} = Execute AWSS3APICli create-multipart-upload --bucket ${BUCKET} --key multipartKey5
${uploadID} = Execute and checkrc echo '${result}' | jq -r '.UploadId' 0
Should contain ${result} ${BUCKET}
Should contain ${result} multipartKey
Should contain ${result} UploadId
#upload parts
${system} = Evaluate platform.system() platform
Run Keyword if '${system}' == 'Darwin' Create Random file for mac
Run Keyword if '${system}' == 'Linux' Create Random file for linux
${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key multipartKey5 --part-number 1 --body /tmp/part1 --upload-id ${uploadID}
${eTag1} = Execute and checkrc echo '${result}' | jq -r '.ETag' 0
Should contain ${result} ETag
Execute echo "Part2" > /tmp/part2
${result} = Execute AWSS3APICli upload-part --bucket ${BUCKET} --key multipartKey5 --part-number 2 --body /tmp/part2 --upload-id ${uploadID}
${eTag2} = Execute and checkrc echo '${result}' | jq -r '.ETag' 0
Should contain ${result} ETag
#list parts
${result} = Execute AWSS3APICli list-parts --bucket ${BUCKET} --key multipartKey5 --upload-id ${uploadID}
${part1} = Execute and checkrc echo '${result}' | jq -r '.Parts[0].ETag' 0
${part2} = Execute and checkrc echo '${result}' | jq -r '.Parts[1].ETag' 0
Should Be equal ${part1} ${eTag1}
Should contain ${part2} ${eTag2}
Should contain ${result} STANDARD
#list parts with max-items and next token
${result} = Execute AWSS3APICli list-parts --bucket ${BUCKET} --key multipartKey5 --upload-id ${uploadID} --max-items 1
${part1} = Execute and checkrc echo '${result}' | jq -r '.Parts[0].ETag' 0
${token} = Execute and checkrc echo '${result}' | jq -r '.NextToken' 0
Should Be equal ${part1} ${eTag1}
Should contain ${result} STANDARD
${result} = Execute AWSS3APICli list-parts --bucket ${BUCKET} --key multipartKey5 --upload-id ${uploadID} --max-items 1 --starting-token ${token}
${part2} = Execute and checkrc echo '${result}' | jq -r '.Parts[0].ETag' 0
Should Be equal ${part2} ${eTag2}
Should contain ${result} STANDARD
#finally abort it
${result} = Execute AWSS3APICli and checkrc abort-multipart-upload --bucket ${BUCKET} --key multipartKey5 --upload-id ${uploadID} 0

View File

@ -0,0 +1,196 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.s3.endpoint;
import org.apache.hadoop.ozone.s3.commontypes.IsoDateAdapter;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
/**
* Request for list parts of a multipart upload request.
*/
@XmlAccessorType(XmlAccessType.FIELD)
@XmlRootElement(name = "ListPartsResult", namespace = "http://s3.amazonaws"
+ ".com/doc/2006-03-01/")
public class ListPartsResponse {
@XmlElement(name = "Bucket")
private String bucket;
@XmlElement(name = "Key")
private String key;
@XmlElement(name = "UploadId")
private String uploadID;
@XmlElement(name = "StorageClass")
private String storageClass;
@XmlElement(name = "PartNumberMarker")
private int partNumberMarker;
@XmlElement(name = "NextPartNumberMarker")
private int nextPartNumberMarker;
@XmlElement(name = "MaxParts")
private int maxParts;
@XmlElement(name = "IsTruncated")
private boolean truncated;
@XmlElement(name = "Part")
private List<Part> partList = new ArrayList<>();
public String getBucket() {
return bucket;
}
public void setBucket(String bucket) {
this.bucket = bucket;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getUploadID() {
return uploadID;
}
public void setUploadID(String uploadID) {
this.uploadID = uploadID;
}
public String getStorageClass() {
return storageClass;
}
public void setStorageClass(String storageClass) {
this.storageClass = storageClass;
}
public int getPartNumberMarker() {
return partNumberMarker;
}
public void setPartNumberMarker(int partNumberMarker) {
this.partNumberMarker = partNumberMarker;
}
public int getNextPartNumberMarker() {
return nextPartNumberMarker;
}
public void setNextPartNumberMarker(int nextPartNumberMarker) {
this.nextPartNumberMarker = nextPartNumberMarker;
}
public int getMaxParts() {
return maxParts;
}
public void setMaxParts(int maxParts) {
this.maxParts = maxParts;
}
public boolean getTruncated() {
return truncated;
}
public void setTruncated(boolean truncated) {
this.truncated = truncated;
}
public List<Part> getPartList() {
return partList;
}
public void setPartList(List<Part> partList) {
this.partList = partList;
}
public void addPart(Part part) {
this.partList.add(part);
}
/**
* Part information.
*/
@XmlAccessorType(XmlAccessType.FIELD)
@XmlRootElement(name = "Part")
public static class Part {
@XmlElement(name = "PartNumber")
private int partNumber;
@XmlJavaTypeAdapter(IsoDateAdapter.class)
@XmlElement(name = "LastModified")
private Instant lastModified;
@XmlElement(name = "ETag")
private String eTag;
@XmlElement(name = "Size")
private long size;
public int getPartNumber() {
return partNumber;
}
public void setPartNumber(int partNumber) {
this.partNumber = partNumber;
}
public Instant getLastModified() {
return lastModified;
}
public void setLastModified(Instant lastModified) {
this.lastModified = lastModified;
}
public String getETag() {
return eTag;
}
public void setETag(String tag) {
this.eTag = tag;
}
public long getSize() {
return size;
}
public void setSize(long size) {
this.size = size;
}
}
}

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
@ -186,17 +187,34 @@ public class ObjectEndpoint extends EndpointBase {
}
/**
* Rest endpoint to download object from a bucket.
* Rest endpoint to download object from a bucket, if query param uploadId
* is specified, request for list parts of a multipart upload key with
* specific uploadId.
* <p>
* See: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html for
* more details.
* See: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html
* https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadListParts.html
* for more details.
*/
@GET
public Response get(
@PathParam("bucket") String bucketName,
@PathParam("path") String keyPath,
@QueryParam("uploadId") String uploadId,
@QueryParam("max-parts") @DefaultValue("1000") int maxParts,
@QueryParam("part-number-marker") String partNumberMarker,
InputStream body) throws IOException, OS3Exception {
try {
if (uploadId != null) {
// When we have uploadId, this is the request for list Parts.
int partMarker = 0;
if (partNumberMarker != null) {
partMarker = Integer.parseInt(partNumberMarker);
}
return listParts(bucketName, keyPath, uploadId,
partMarker, maxParts);
}
OzoneBucket bucket = getBucket(bucketName);
OzoneKeyDetails keyDetails = bucket.getKey(keyPath);
@ -550,6 +568,68 @@ public class ObjectEndpoint extends EndpointBase {
}
/**
* Returns response for the listParts request.
* See: https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadListParts.html
* @param bucket
* @param key
* @param uploadID
* @param partNumberMarker
* @param maxParts
* @return
* @throws IOException
* @throws OS3Exception
*/
private Response listParts(String bucket, String key, String uploadID,
int partNumberMarker, int maxParts) throws IOException, OS3Exception {
ListPartsResponse listPartsResponse = new ListPartsResponse();
try {
OzoneBucket ozoneBucket = getBucket(bucket);
OzoneMultipartUploadPartListParts ozoneMultipartUploadPartListParts =
ozoneBucket.listParts(key, uploadID, partNumberMarker, maxParts);
listPartsResponse.setBucket(bucket);
listPartsResponse.setKey(key);
listPartsResponse.setUploadID(uploadID);
listPartsResponse.setMaxParts(maxParts);
listPartsResponse.setPartNumberMarker(partNumberMarker);
listPartsResponse.setTruncated(false);
if (ozoneMultipartUploadPartListParts.getReplicationType().toString()
.equals(ReplicationType.STAND_ALONE.toString())) {
listPartsResponse.setStorageClass(S3StorageType.REDUCED_REDUNDANCY
.toString());
} else {
listPartsResponse.setStorageClass(S3StorageType.STANDARD.toString());
}
if (ozoneMultipartUploadPartListParts.isTruncated()) {
listPartsResponse.setTruncated(
ozoneMultipartUploadPartListParts.isTruncated());
listPartsResponse.setNextPartNumberMarker(
ozoneMultipartUploadPartListParts.getNextPartNumberMarker());
}
ozoneMultipartUploadPartListParts.getPartInfoList().forEach(partInfo -> {
ListPartsResponse.Part part = new ListPartsResponse.Part();
part.setPartNumber(partInfo.getPartNumber());
part.setETag(partInfo.getPartName());
part.setSize(partInfo.getSize());
part.setLastModified(Instant.ofEpochMilli(
partInfo.getModificationTime()));
listPartsResponse.addPart(part);
});
} catch (IOException ex) {
if (ex.getMessage().contains("NO_SUCH_MULTIPART_UPLOAD_ERROR")) {
OS3Exception os3Exception = S3ErrorTable.newError(NO_SUCH_UPLOAD,
uploadID);
throw os3Exception;
}
throw ex;
}
return Response.status(Status.OK).entity(listPartsResponse).build();
}
@VisibleForTesting
public void setHeaders(HttpHeaders headers) {
this.headers = headers;

View File

@ -38,8 +38,10 @@ import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts.PartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.util.Time;
/**
* In-memory ozone bucket for testing.
@ -236,6 +238,55 @@ public class OzoneBucketStub extends OzoneBucket {
}
}
@Override
public OzoneMultipartUploadPartListParts listParts(String key,
String uploadID, int partNumberMarker, int maxParts) throws IOException {
if (multipartUploadIdMap.get(key) == null) {
throw new IOException("NO_SUCH_MULTIPART_UPLOAD");
}
List<PartInfo> partInfoList = new ArrayList<>();
if (partList.get(key) == null) {
return new OzoneMultipartUploadPartListParts(ReplicationType.STAND_ALONE,
0, false);
} else {
Map<Integer, Part> partMap = partList.get(key);
Iterator<Map.Entry<Integer, Part>> partIterator =
partMap.entrySet().iterator();
int count = 0;
int nextPartNumberMarker = 0;
boolean truncated = false;
while (count < maxParts && partIterator.hasNext()) {
Map.Entry<Integer, Part> partEntry = partIterator.next();
nextPartNumberMarker = partEntry.getKey();
if (partEntry.getKey() > partNumberMarker) {
PartInfo partInfo = new PartInfo(partEntry.getKey(),
partEntry.getValue().getPartName(),
partEntry.getValue().getContent().length, Time.now());
partInfoList.add(partInfo);
count++;
}
}
if (partIterator.hasNext()) {
truncated = true;
} else {
truncated = false;
nextPartNumberMarker = 0;
}
OzoneMultipartUploadPartListParts ozoneMultipartUploadPartListParts =
new OzoneMultipartUploadPartListParts(ReplicationType.STAND_ALONE,
nextPartNumberMarker, truncated);
ozoneMultipartUploadPartListParts.addAllParts(partInfoList);
return ozoneMultipartUploadPartListParts;
}
}
/**
* Class used to hold part information in a upload part request.
*/

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.s3.endpoint;
import org.apache.hadoop.ozone.client.OzoneClientStub;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import java.io.ByteArrayInputStream;
import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.when;
/**
* This class test list parts request.
*/
public class TestListParts {
private final static ObjectEndpoint REST = new ObjectEndpoint();
private final static String BUCKET = "s3bucket";
private final static String KEY = "key1";
private static String uploadID;
@BeforeClass
public static void setUp() throws Exception {
OzoneClientStub client = new OzoneClientStub();
client.getObjectStore().createS3Bucket("ozone", BUCKET);
HttpHeaders headers = Mockito.mock(HttpHeaders.class);
when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn(
"STANDARD");
REST.setHeaders(headers);
REST.setClient(client);
Response response = REST.multipartUpload(BUCKET, KEY, "", "", null);
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
(MultipartUploadInitiateResponse) response.getEntity();
assertNotNull(multipartUploadInitiateResponse.getUploadID());
uploadID = multipartUploadInitiateResponse.getUploadID();
assertEquals(response.getStatus(), 200);
String content = "Multipart Upload";
ByteArrayInputStream body = new ByteArrayInputStream(content.getBytes());
response = REST.put(BUCKET, KEY, content.length(), 1, uploadID, body);
assertNotNull(response.getHeaderString("ETag"));
response = REST.put(BUCKET, KEY, content.length(), 2, uploadID, body);
assertNotNull(response.getHeaderString("ETag"));
response = REST.put(BUCKET, KEY, content.length(), 3, uploadID, body);
assertNotNull(response.getHeaderString("ETag"));
}
@Test
public void testListParts() throws Exception {
Response response = REST.get(BUCKET, KEY, uploadID, 3, "0", null);
ListPartsResponse listPartsResponse =
(ListPartsResponse) response.getEntity();
Assert.assertFalse(listPartsResponse.getTruncated());
Assert.assertTrue(listPartsResponse.getPartList().size() == 3);
}
@Test
public void testListPartsContinuation() throws Exception {
Response response = REST.get(BUCKET, KEY, uploadID, 2, "0", null);
ListPartsResponse listPartsResponse =
(ListPartsResponse) response.getEntity();
Assert.assertTrue(listPartsResponse.getTruncated());
Assert.assertTrue(listPartsResponse.getPartList().size() == 2);
// Continue
response = REST.get(BUCKET, KEY, uploadID, 2,
Integer.toString(listPartsResponse.getNextPartNumberMarker()), null);
listPartsResponse = (ListPartsResponse) response.getEntity();
Assert.assertFalse(listPartsResponse.getTruncated());
Assert.assertTrue(listPartsResponse.getPartList().size() == 1);
}
@Test
public void testListPartsWithUnknownUploadID() throws Exception {
try {
Response response = REST.get(BUCKET, KEY, uploadID, 2, "0", null);
} catch (OS3Exception ex) {
Assert.assertEquals(S3ErrorTable.NO_SUCH_UPLOAD.getErrorMessage(),
ex.getErrorMessage());
}
}
}

View File

@ -70,7 +70,7 @@ public class TestObjectGet {
new ByteArrayInputStream(CONTENT.getBytes(UTF_8));
//WHEN
Response response = rest.get("b1", "key1", body);
Response response = rest.get("b1", "key1", null, 0, null, body);
//THEN
OzoneInputStream ozoneInputStream =

View File

@ -44,7 +44,7 @@ import static org.mockito.Mockito.when;
*/
public class TestPartUpload {
private final static ObjectEndpoint REST = new ObjectEndpoint();;
private final static ObjectEndpoint REST = new ObjectEndpoint();
private final static String BUCKET = "s3bucket";
private final static String KEY = "key1";