diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index 957aab077f5..118cf24d5c1 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -237,10 +237,14 @@ private OzoneConsts() { public static final String REPLICATION_TYPE = "replicationType"; public static final String REPLICATION_FACTOR = "replicationFactor"; public static final String KEY_LOCATION_INFO = "keyLocationInfo"; + public static final String MULTIPART_LIST = "multipartList"; // For OM metrics saving to a file public static final String OM_METRICS_FILE = "omMetrics"; public static final String OM_METRICS_TEMP_FILE = OM_METRICS_FILE + ".tmp"; + // For Multipart upload + public static final int OM_MULTIPART_MIN_SIZE = 5 * 1024 * 1024; + } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java index 3a681d7ff51..e6a6e12f1ff 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java @@ -31,10 +31,12 @@ import org.apache.hadoop.ozone.client.protocol.ClientProtocol; import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; /** @@ -353,6 +355,15 @@ public OmMultipartInfo initiateMultipartUpload(String key) defaultReplication); } + /** + * Create a part key for a multipart upload key. + * @param key + * @param size + * @param partNumber + * @param uploadID + * @return OzoneOutputStream + * @throws IOException + */ public OzoneOutputStream createMultipartKey(String key, long size, int partNumber, String uploadID) throws IOException { @@ -360,6 +371,21 @@ public OzoneOutputStream createMultipartKey(String key, long size, uploadID); } + /** + * Complete Multipart upload. This will combine all the parts and make the + * key visible in ozone. + * @param key + * @param uploadID + * @param partsMap + * @return OmMultipartUploadCompleteInfo + * @throws IOException + */ + public OmMultipartUploadCompleteInfo completeMultipartUpload(String key, + String uploadID, Map partsMap) throws IOException { + return proxy.completeMultipartUpload(volumeName, name, key, uploadID, + partsMap); + } + /** * An Iterator to iterate over {@link OzoneKey} list. */ diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java index 02dc530b1e6..5960943c1f2 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java @@ -27,9 +27,11 @@ import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; import java.io.IOException; import java.util.List; +import java.util.Map; /** * An implementer of this interface is capable of connecting to Ozone Cluster @@ -401,10 +403,35 @@ OmMultipartInfo initiateMultipartUpload(String volumeName, String bucketName, String keyName, ReplicationType type, ReplicationFactor factor) throws IOException; + /** + * Create a part key for a multipart upload key. + * @param volumeName + * @param bucketName + * @param keyName + * @param size + * @param partNumber + * @param uploadID + * @return OzoneOutputStream + * @throws IOException + */ OzoneOutputStream createMultipartKey(String volumeName, String bucketName, String keyName, long size, int partNumber, String uploadID) throws IOException; + /** + * Complete Multipart upload. This will combine all the parts and make the + * key visible in ozone. + * @param volumeName + * @param bucketName + * @param keyName + * @param uploadID + * @param partsMap + * @return OmMultipartUploadCompleteInfo + * @throws IOException + */ + OmMultipartUploadCompleteInfo completeMultipartUpload(String volumeName, + String bucketName, String keyName, String uploadID, + Map partsMap) throws IOException; } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java index b40d5e1a0bd..85e559fbcc7 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java @@ -43,6 +43,7 @@ import org.apache.hadoop.ozone.client.rest.response.VolumeInfo; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; import org.apache.hadoop.ozone.om.helpers.ServiceInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort; import org.apache.hadoop.ozone.web.response.ListBuckets; @@ -79,6 +80,7 @@ import java.text.ParseException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; @@ -975,4 +977,12 @@ public OzoneOutputStream createMultipartKey(String volumeName, throw new UnsupportedOperationException("Ozone REST protocol does not " + "support this operation."); } + + @Override + public OmMultipartUploadCompleteInfo completeMultipartUpload( + String volumeName, String bucketName, String keyName, String uploadID, + Map partsMap) throws IOException { + throw new UnsupportedOperationException("Ozone REST protocol does not " + + "support this operation."); + } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 17433726994..72da5f8816b 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -50,6 +50,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.ServiceInfo; @@ -753,4 +755,29 @@ public OzoneOutputStream createMultipartKey(String volumeName, return new OzoneOutputStream(groupOutputStream); } + @Override + public OmMultipartUploadCompleteInfo completeMultipartUpload( + String volumeName, String bucketName, String keyName, String uploadID, + Map partsMap) throws IOException { + HddsClientUtils.verifyResourceName(volumeName, bucketName); + HddsClientUtils.checkNotNull(keyName, uploadID); + + OmKeyArgs keyArgs = new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setMultipartUploadID(uploadID) + .build(); + + OmMultipartUploadList omMultipartUploadList = new OmMultipartUploadList( + partsMap); + + OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo = + ozoneManagerClient.completeMultipartUpload(keyArgs, + omMultipartUploadList); + + return omMultipartUploadCompleteInfo; + + } + } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java index a65f6bc7243..d3b2522a488 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java @@ -48,7 +48,8 @@ public enum OMAction implements AuditAction { READ_KEY, LIST_S3BUCKETS, INITIATE_MULTIPART_UPLOAD, - COMMIT_MULTIPART_UPLOAD_PARTKEY; + COMMIT_MULTIPART_UPLOAD_PARTKEY, + COMPLETE_MULTIPART_UPLOAD; @Override public String getAction() { diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java index 152091df2ff..2c976fb9ffe 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.TreeMap; /** * This class represents multipart upload information for a key, which holds @@ -30,7 +31,7 @@ */ public class OmMultipartKeyInfo { private String uploadID; - private Map partKeyInfoList; + private TreeMap partKeyInfoList; /** * Construct OmMultipartKeyInfo object which holds multipart upload @@ -40,7 +41,7 @@ public class OmMultipartKeyInfo { */ public OmMultipartKeyInfo(String id, Map list) { this.uploadID = id; - this.partKeyInfoList = new HashMap<>(list); + this.partKeyInfoList = new TreeMap<>(list); } /** @@ -51,7 +52,7 @@ public String getUploadID() { return uploadID; } - public Map getPartKeyInfoList() { + public TreeMap getPartKeyInfoList() { return partKeyInfoList; } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadCompleteInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadCompleteInfo.java new file mode 100644 index 00000000000..71ce882c6f8 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadCompleteInfo.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.helpers; + +/** + * This class holds information about the response of complete Multipart + * upload request. + */ +public class OmMultipartUploadCompleteInfo { + + private String volume; + private String bucket; + private String key; + private String hash; // this is used as ETag for S3. + + public OmMultipartUploadCompleteInfo(String volumeName, String bucketName, + String keyName, String md5) { + this.volume = volumeName; + this.bucket = bucketName; + this.key = keyName; + this.hash = md5; + } + + public String getVolume() { + return volume; + } + + public void setVolume(String volume) { + this.volume = volume; + } + + public String getBucket() { + return bucket; + } + + public void setBucket(String bucket) { + this.bucket = bucket; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public String getHash() { + return hash; + } + + public void setHash(String hash) { + this.hash = hash; + } +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadList.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadList.java new file mode 100644 index 00000000000..99cd5ad5126 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadList.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.helpers; + +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Part; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * This class represents multipart list, which is required for + * CompleteMultipart upload request. + */ +public class OmMultipartUploadList { + + private final TreeMap multipartMap; + + /** + * Construct OmMultipartUploadList which holds multipart map which contains + * part number and part name. + * @param partMap + */ + public OmMultipartUploadList(Map partMap) { + this.multipartMap = new TreeMap<>(partMap); + } + + /** + * Return multipartMap which is a map of part number and part name. + * @return multipartMap + */ + public TreeMap getMultipartMap() { + return multipartMap; + } + + /** + * Construct Part list from the multipartMap. + * @return List + */ + public List getPartsList() { + List partList = new ArrayList<>(); + multipartMap.forEach((partNumber, partName) -> partList.add(Part + .newBuilder().setPartName(partName).setPartNumber(partNumber).build())); + return partList; + } +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java index ee1ff6fb26a..09da1abdd0d 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java @@ -23,6 +23,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.ServiceInfo; @@ -315,9 +317,26 @@ List listS3Buckets(String userName, String startBucketName, OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException; - + /** + * Commit Multipart upload part file. + * @param omKeyArgs + * @param clientID + * @return OmMultipartCommitUploadPartInfo + * @throws IOException + */ OmMultipartCommitUploadPartInfo commitMultipartUploadPart( OmKeyArgs omKeyArgs, long clientID) throws IOException; + /** + * Complete Multipart upload Request. + * @param omKeyArgs + * @param multipartUploadList + * @return OmMultipartUploadCompleteInfo + * @throws IOException + */ + OmMultipartUploadCompleteInfo completeMultipartUpload( + OmKeyArgs omKeyArgs, OmMultipartUploadList multipartUploadList) + throws IOException; + } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index 2059c83fe41..220e839f1ea 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -32,6 +32,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.ServiceInfo; @@ -88,6 +90,10 @@ .MultipartInfoInitiateRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .MultipartInfoInitiateResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartUploadCompleteRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartUploadCompleteResponse; import org.apache.hadoop.ozone.protocol.proto .OzoneManagerProtocolProtos.RenameKeyRequest; import org.apache.hadoop.ozone.protocol.proto @@ -970,6 +976,11 @@ public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws @Override public OmMultipartCommitUploadPartInfo commitMultipartUploadPart( OmKeyArgs omKeyArgs, long clientId) throws IOException { + + List locationInfoList = omKeyArgs.getLocationInfoList(); + Preconditions.checkNotNull(locationInfoList); + + MultipartCommitUploadPartRequest.Builder multipartCommitUploadPartRequest = MultipartCommitUploadPartRequest.newBuilder(); @@ -979,7 +990,11 @@ public OmMultipartCommitUploadPartInfo commitMultipartUploadPart( .setKeyName(omKeyArgs.getKeyName()) .setMultipartUploadID(omKeyArgs.getMultipartUploadID()) .setIsMultipartKey(omKeyArgs.getIsMultipartKey()) - .setMultipartNumber(omKeyArgs.getMultipartUploadPartNumber()); + .setMultipartNumber(omKeyArgs.getMultipartUploadPartNumber()) + .setDataSize(omKeyArgs.getDataSize()) + .addAllKeyLocations( + locationInfoList.stream().map(OmKeyLocationInfo::getProtobuf) + .collect(Collectors.toList())); multipartCommitUploadPartRequest.setClientID(clientId); multipartCommitUploadPartRequest.setKeyArgs(keyArgs.build()); @@ -1002,6 +1017,42 @@ public OmMultipartCommitUploadPartInfo commitMultipartUploadPart( return info; } + @Override + public OmMultipartUploadCompleteInfo completeMultipartUpload( + OmKeyArgs omKeyArgs, OmMultipartUploadList multipartUploadList) + throws IOException { + MultipartUploadCompleteRequest.Builder multipartUploadCompleteRequest = + MultipartUploadCompleteRequest.newBuilder(); + + KeyArgs.Builder keyArgs = KeyArgs.newBuilder() + .setVolumeName(omKeyArgs.getVolumeName()) + .setBucketName(omKeyArgs.getBucketName()) + .setKeyName(omKeyArgs.getKeyName()) + .setMultipartUploadID(omKeyArgs.getMultipartUploadID()); + + multipartUploadCompleteRequest.setKeyArgs(keyArgs.build()); + multipartUploadCompleteRequest.addAllPartsList(multipartUploadList + .getPartsList()); + + OMRequest omRequest = createOMRequest( + Type.CompleteMultiPartUpload) + .setCompleteMultiPartUploadRequest( + multipartUploadCompleteRequest.build()).build(); + + MultipartUploadCompleteResponse response = submitRequest(omRequest) + .getCompleteMultiPartUploadResponse(); + + if (response.getStatus() != Status.OK) { + throw new IOException("Complete multipart upload failed, error:" + + response.getStatus()); + } + + OmMultipartUploadCompleteInfo info = new + OmMultipartUploadCompleteInfo(response.getVolume(), response + .getBucket(), response.getKey(), response.getHash()); + return info; + } + public List getServiceList() throws IOException { ServiceListRequest req = ServiceListRequest.newBuilder().build(); diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index 93c86f12bc4..b32d324b3e3 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -65,6 +65,7 @@ enum Type { ListS3Buckets = 44; InitiateMultiPartUpload = 45; CommitMultiPartUpload = 46; + CompleteMultiPartUpload = 47; ServiceList = 51; } @@ -105,6 +106,7 @@ message OMRequest { optional S3ListBucketsRequest listS3BucketsRequest = 44; optional MultipartInfoInitiateRequest initiateMultiPartUploadRequest = 45; optional MultipartCommitUploadPartRequest commitMultiPartUploadRequest = 46; + optional MultipartUploadCompleteRequest completeMultiPartUploadRequest = 47; optional ServiceListRequest serviceListRequest = 51; } @@ -146,6 +148,7 @@ message OMResponse { optional S3ListBucketsResponse listS3BucketsResponse = 44; optional MultipartInfoInitiateResponse initiateMultiPartUploadResponse = 45; optional MultipartCommitUploadPartResponse commitMultiPartUploadResponse = 46; + optional MultipartUploadCompleteResponse completeMultiPartUploadResponse = 47; optional ServiceListResponse ServiceListResponse = 51; } @@ -177,6 +180,10 @@ enum Status { INITIATE_MULTIPART_UPLOAD_ERROR = 24; MULTIPART_UPLOAD_PARTFILE_ERROR = 25; NO_SUCH_MULTIPART_UPLOAD_ERROR = 26; + MISMATCH_MULTIPART_LIST = 27; + MISSING_UPLOAD_PARTS = 28; + COMPLETE_MULTIPART_UPLOAD_ERROR = 29; + ENTITY_TOO_SMALL = 30; } @@ -583,6 +590,24 @@ message MultipartCommitUploadPartResponse { required Status status = 2; } +message MultipartUploadCompleteRequest { + required KeyArgs keyArgs = 1; + repeated Part partsList = 2; +} + +message MultipartUploadCompleteResponse { + optional string volume = 1; + optional string bucket = 2; + optional string key = 3; + optional string hash = 4; // This will be used as etag for s3 + required Status status = 5; +} + +message Part { + required uint32 partNumber = 1; + required string partName = 2; +} + /** The OM service that takes care of Ozone namespace. */ diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java index 90c3c1f8b4e..f82da1644b3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java @@ -69,8 +69,11 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.TreeMap; import java.util.UUID; import static org.hamcrest.CoreMatchers.containsString; @@ -1497,7 +1500,274 @@ public void testNoSuchUploadError() throws IOException { GenericTestUtils.assertExceptionContains("NO_SUCH_MULTIPART_UPLOAD_ERROR", ex); } + } + @Test + public void testMultipartUpload() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + doMultipartUpload(bucket, keyName, (byte)98); + + } + + + @Test + public void testMultipartUploadOverride() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + doMultipartUpload(bucket, keyName, (byte)96); + + // Initiate Multipart upload again, now we should read latest version, as + // read always reads latest blocks. + doMultipartUpload(bucket, keyName, (byte)97); + + } + + + @Test + public void testMultipartUploadWithPartsLessThanMinSize() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + // Initiate multipart upload + String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType + .STAND_ALONE, ReplicationFactor.ONE); + + // Upload Parts + Map partsMap = new TreeMap<>(); + // Uploading part 1 with less than min size + String partName = uploadPart(bucket, keyName, uploadID, 1, "data".getBytes( + "UTF-8")); + partsMap.put(1, partName); + + partName = uploadPart(bucket, keyName, uploadID, 2, "data".getBytes( + "UTF-8")); + partsMap.put(2, partName); + + + // Complete multipart upload + + try { + completeMultipartUpload(bucket, keyName, uploadID, partsMap); + fail("testMultipartUploadWithPartsLessThanMinSize failed"); + } catch (IOException ex) { + GenericTestUtils.assertExceptionContains("ENTITY_TOO_SMALL", ex); + } + + } + + + + @Test + public void testMultipartUploadWithPartsMisMatchWithListSizeDifferent() + throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType + .STAND_ALONE, ReplicationFactor.ONE); + + // We have not uploaded any parts, but passing some list it should throw + // error. + TreeMap partsMap = new TreeMap<>(); + partsMap.put(1, UUID.randomUUID().toString()); + + try { + completeMultipartUpload(bucket, keyName, uploadID, partsMap); + fail("testMultipartUploadWithPartsMisMatch"); + } catch (IOException ex) { + GenericTestUtils.assertExceptionContains("MISMATCH_MULTIPART_LIST", ex); + } + + } + + @Test + public void testMultipartUploadWithPartsMisMatchWithIncorrectPartName() + throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType + .STAND_ALONE, ReplicationFactor.ONE); + + uploadPart(bucket, keyName, uploadID, 1, "data".getBytes("UTF-8")); + // We have not uploaded any parts, but passing some list it should throw + // error. + TreeMap partsMap = new TreeMap<>(); + partsMap.put(1, UUID.randomUUID().toString()); + + try { + completeMultipartUpload(bucket, keyName, uploadID, partsMap); + fail("testMultipartUploadWithPartsMisMatch"); + } catch (IOException ex) { + GenericTestUtils.assertExceptionContains("MISMATCH_MULTIPART_LIST", ex); + } + + } + + @Test + public void testMultipartUploadWithMissingParts() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType + .STAND_ALONE, ReplicationFactor.ONE); + + uploadPart(bucket, keyName, uploadID, 1, "data".getBytes("UTF-8")); + // We have not uploaded any parts, but passing some list it should throw + // error. + TreeMap partsMap = new TreeMap<>(); + partsMap.put(3, "random"); + + try { + completeMultipartUpload(bucket, keyName, uploadID, partsMap); + fail("testMultipartUploadWithPartsMisMatch"); + } catch (IOException ex) { + GenericTestUtils.assertExceptionContains("MISSING_UPLOAD_PARTS", ex); + } + } + + + private byte[] generateData(int size, byte val) { + byte[] chars = new byte[size]; + Arrays.fill(chars, val); + return chars; + } + + + private void doMultipartUpload(OzoneBucket bucket, String keyName, byte val) + throws Exception { + // Initiate Multipart upload request + String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType + .RATIS, ReplicationFactor.THREE); + + // Upload parts + Map partsMap = new TreeMap<>(); + + // get 5mb data, as each part should be of min 5mb, last part can be less + // than 5mb + int length = 0; + byte[] data = generateData(OzoneConsts.OM_MULTIPART_MIN_SIZE, val); + String partName = uploadPart(bucket, keyName, uploadID, 1, data); + partsMap.put(1, partName); + length += data.length; + + + partName = uploadPart(bucket, keyName, uploadID, 2, data); + partsMap.put(2, partName); + length += data.length; + + String part3 = UUID.randomUUID().toString(); + partName = uploadPart(bucket, keyName, uploadID, 3, part3.getBytes( + "UTF-8")); + partsMap.put(3, partName); + length += part3.getBytes("UTF-8").length; + + + // Complete multipart upload request + completeMultipartUpload(bucket, keyName, uploadID, partsMap); + + + //Now Read the key which has been completed multipart upload. + byte[] fileContent = new byte[data.length + data.length + part3.getBytes( + "UTF-8").length]; + OzoneInputStream inputStream = bucket.readKey(keyName); + inputStream.read(fileContent); + + Assert.assertTrue(verifyRatisReplication(bucket.getVolumeName(), + bucket.getName(), keyName, ReplicationType.RATIS, + ReplicationFactor.THREE)); + + StringBuilder sb = new StringBuilder(length); + + // Combine all parts data, and check is it matching with get key data. + String part1 = new String(data); + String part2 = new String(data); + sb.append(part1); + sb.append(part2); + sb.append(part3); + Assert.assertEquals(sb.toString(), new String(fileContent)); + } + + + private String initiateMultipartUpload(OzoneBucket bucket, String keyName, + ReplicationType replicationType, ReplicationFactor replicationFactor) + throws Exception { + OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName, + replicationType, replicationFactor); + + String uploadID = multipartInfo.getUploadID(); + Assert.assertNotNull(uploadID); + return uploadID; + } + + private String uploadPart(OzoneBucket bucket, String keyName, String + uploadID, int partNumber, byte[] data) throws Exception { + OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName, + data.length, partNumber, uploadID); + ozoneOutputStream.write(data, 0, + data.length); + ozoneOutputStream.close(); + + OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo = + ozoneOutputStream.getCommitUploadPartInfo(); + + Assert.assertNotNull(omMultipartCommitUploadPartInfo); + Assert.assertNotNull(omMultipartCommitUploadPartInfo.getPartName()); + return omMultipartCommitUploadPartInfo.getPartName(); + + } + + private void completeMultipartUpload(OzoneBucket bucket, String keyName, + String uploadID, Map partsMap) throws Exception { + OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo = bucket + .completeMultipartUpload(keyName, uploadID, partsMap); + + Assert.assertNotNull(omMultipartUploadCompleteInfo); + Assert.assertEquals(omMultipartUploadCompleteInfo.getBucket(), bucket + .getName()); + Assert.assertEquals(omMultipartUploadCompleteInfo.getVolume(), bucket + .getVolumeName()); + Assert.assertEquals(omMultipartUploadCompleteInfo.getKey(), keyName); + Assert.assertNotNull(omMultipartUploadCompleteInfo.getHash()); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index f5f3f1bbacd..29e0c608b1c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -23,6 +23,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.utils.BackgroundService; @@ -191,8 +193,24 @@ List listKeys(String volumeName, */ OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException; + /** + * Commit Multipart upload part file. + * @param omKeyArgs + * @param clientID + * @return OmMultipartCommitUploadPartInfo + * @throws IOException + */ OmMultipartCommitUploadPartInfo commitMultipartUploadPart( - OmKeyArgs keyArgs, long clientID) throws IOException; + OmKeyArgs omKeyArgs, long clientID) throws IOException; + /** + * Complete Multipart upload Request. + * @param omKeyArgs + * @param multipartUploadList + * @return OmMultipartUploadCompleteInfo + * @throws IOException + */ + OmMultipartUploadCompleteInfo completeMultipartUpload(OmKeyArgs omKeyArgs, + OmMultipartUploadList multipartUploadList) throws IOException; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 614d453716d..1347e1cabcb 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -22,9 +22,11 @@ import java.util.List; import java.util.Map; import java.util.HashMap; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -43,6 +45,8 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .PartKeyInfo; @@ -62,6 +66,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConsts.OM_MULTIPART_MIN_SIZE; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -677,6 +682,10 @@ public OmMultipartCommitUploadPartInfo commitMultipartUploadPart( OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get( openKey); + // set the data size and location info list + keyInfo.setDataSize(omKeyArgs.getDataSize()); + keyInfo.updateLocationInfoList(omKeyArgs.getLocationInfoList()); + partName = keyName + clientID; if (multipartKeyInfo == null) { throw new OMException("No such Multipart upload is with specified " + @@ -729,4 +738,152 @@ public OmMultipartCommitUploadPartInfo commitMultipartUploadPart( } + @Override + public OmMultipartUploadCompleteInfo completeMultipartUpload( + OmKeyArgs omKeyArgs, OmMultipartUploadList multipartUploadList) + throws IOException { + Preconditions.checkNotNull(omKeyArgs); + Preconditions.checkNotNull(multipartUploadList); + String volumeName = omKeyArgs.getVolumeName(); + String bucketName = omKeyArgs.getBucketName(); + String keyName = omKeyArgs.getKeyName(); + String uploadID = omKeyArgs.getMultipartUploadID(); + metadataManager.getLock().acquireBucketLock(volumeName, bucketName); + try { + String multipartKey = metadataManager.getMultipartKey(volumeName, + bucketName, keyName, uploadID); + String ozoneKey = metadataManager.getOzoneKey(volumeName, bucketName, + keyName); + OmKeyInfo keyInfo = metadataManager.getKeyTable().get(ozoneKey); + + OmMultipartKeyInfo multipartKeyInfo = metadataManager + .getMultipartInfoTable().get(multipartKey); + if (multipartKeyInfo == null) { + throw new OMException("Complete Multipart Upload Failed: volume: " + + volumeName + "bucket: " + bucketName + "key: " + keyName, + ResultCodes.NO_SUCH_MULTIPART_UPLOAD); + } + TreeMap partKeyInfoMap = multipartKeyInfo + .getPartKeyInfoList(); + + TreeMap multipartMap = multipartUploadList + .getMultipartMap(); + + // Last key in the map should be having key value as size, as map's + // are sorted. Last entry in both maps should have partNumber as size + // of the map. As we have part entries 1, 2, 3, 4 and then we get + // complete multipart upload request so the map last entry should have 4, + // if it is having value greater or less than map size, then there is + // some thing wrong throw error. + + Map.Entry multipartMapLastEntry = multipartMap + .lastEntry(); + Map.Entry partKeyInfoLastEntry = partKeyInfoMap + .lastEntry(); + if (partKeyInfoMap.size() != multipartMap.size()) { + throw new OMException("Complete Multipart Upload Failed: volume: " + + volumeName + "bucket: " + bucketName + "key: " + keyName, + ResultCodes.MISMATCH_MULTIPART_LIST); + } + + // Last entry part Number should be the size of the map, otherwise this + // means we have missing some parts but we got a complete request. + if (multipartMapLastEntry.getKey() != partKeyInfoMap.size() || + partKeyInfoLastEntry.getKey() != partKeyInfoMap.size()) { + throw new OMException("Complete Multipart Upload Failed: volume: " + + volumeName + "bucket: " + bucketName + "key: " + keyName, + ResultCodes.MISSING_UPLOAD_PARTS); + } + ReplicationType type = partKeyInfoLastEntry.getValue().getPartKeyInfo() + .getType(); + ReplicationFactor factor = partKeyInfoLastEntry.getValue() + .getPartKeyInfo().getFactor(); + List locations = new ArrayList<>(); + long size = 0; + int partsCount =1; + int partsMapSize = partKeyInfoMap.size(); + for(Map.Entry partKeyInfoEntry : partKeyInfoMap + .entrySet()) { + int partNumber = partKeyInfoEntry.getKey(); + PartKeyInfo partKeyInfo = partKeyInfoEntry.getValue(); + // Check we have all parts to complete multipart upload and also + // check partNames provided match with actual part names + String providedPartName = multipartMap.get(partNumber); + String actualPartName = partKeyInfo.getPartName(); + if (partNumber == partsCount) { + if (!actualPartName.equals(providedPartName)) { + throw new OMException("Complete Multipart Upload Failed: volume: " + + volumeName + "bucket: " + bucketName + "key: " + keyName, + ResultCodes.MISMATCH_MULTIPART_LIST); + } + OmKeyInfo currentPartKeyInfo = OmKeyInfo + .getFromProtobuf(partKeyInfo.getPartKeyInfo()); + // Check if any part size is less than 5mb, last part can be less + // than 5 mb. + if (partsCount != partsMapSize && + currentPartKeyInfo.getDataSize() < OM_MULTIPART_MIN_SIZE) { + throw new OMException("Complete Multipart Upload Failed: Entity " + + "too small: volume: " + volumeName + "bucket: " + bucketName + + "key: " + keyName, ResultCodes.ENTITY_TOO_SMALL); + } + // As all part keys will have only one version. + OmKeyLocationInfoGroup currentKeyInfoGroup = currentPartKeyInfo + .getKeyLocationVersions().get(0); + locations.addAll(currentKeyInfoGroup.getLocationList()); + size += currentPartKeyInfo.getDataSize(); + } else { + throw new OMException("Complete Multipart Upload Failed: volume: " + + volumeName + "bucket: " + bucketName + "key: " + keyName, + ResultCodes.MISSING_UPLOAD_PARTS); + } + partsCount++; + } + if (keyInfo == null) { + // This is a newly added key, it does not have any versions. + OmKeyLocationInfoGroup keyLocationInfoGroup = new + OmKeyLocationInfoGroup(0, locations); + // A newly created key, this is the first version. + keyInfo = new OmKeyInfo.Builder() + .setVolumeName(omKeyArgs.getVolumeName()) + .setBucketName(omKeyArgs.getBucketName()) + .setKeyName(omKeyArgs.getKeyName()) + .setReplicationFactor(factor) + .setReplicationType(type) + .setCreationTime(Time.now()) + .setModificationTime(Time.now()) + .setDataSize(size) + .setOmKeyLocationInfos( + Collections.singletonList(keyLocationInfoGroup)) + .build(); + } else { + // Already a version exists, so we should add it as a new version. + // But now as versioning is not supported, just following the commit + // key approach. + // When versioning support comes, then we can uncomment below code + // keyInfo.addNewVersion(locations); + keyInfo.updateLocationInfoList(locations); + } + DBStore store = metadataManager.getStore(); + try (BatchOperation batch = store.initBatchOperation()) { + //Remove entry in multipart table and add a entry in to key table + metadataManager.getMultipartInfoTable().deleteWithBatch(batch, + multipartKey); + metadataManager.getKeyTable().putWithBatch(batch, + ozoneKey, keyInfo); + store.commitBatchOperation(batch); + } + return new OmMultipartUploadCompleteInfo(omKeyArgs.getVolumeName(), + omKeyArgs.getBucketName(), omKeyArgs.getKeyName(), DigestUtils + .sha256Hex(keyName)); + } catch (OMException ex) { + throw ex; + } catch (IOException ex) { + LOG.error("Complete Multipart Upload Failed: volume: " + volumeName + + "bucket: " + bucketName + "key: " + keyName, ex); + throw new OMException(ex.getMessage(), ResultCodes + .COMPLETE_MULTIPART_UPLOAD_FAILED); + } finally { + metadataManager.getLock().releaseBucketLock(volumeName, bucketName); + } + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java index 715ebb8e1f0..89e1679b7c0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java @@ -62,6 +62,7 @@ public class OMMetrics { private @Metric MutableCounterLong numGetServiceLists; private @Metric MutableCounterLong numListS3Buckets; private @Metric MutableCounterLong numInitiateMultipartUploads; + private @Metric MutableCounterLong numCompleteMultipartUploads; // Failure Metrics @@ -88,6 +89,7 @@ public class OMMetrics { private @Metric MutableCounterLong numInitiateMultipartUploadFails; private @Metric MutableCounterLong numCommitMultipartUploadParts; private @Metric MutableCounterLong getNumCommitMultipartUploadPartFails; + private @Metric MutableCounterLong numCompleteMultipartUploadFails; // Metrics for total number of volumes, buckets and keys @@ -247,6 +249,15 @@ public void incNumCommitMultipartUploadPartFails() { numInitiateMultipartUploadFails.incr(); } + public void incNumCompleteMultipartUploads() { + numKeyOps.incr(); + numCompleteMultipartUploads.incr(); + } + + public void incNumCompleteMultipartUploadFails() { + numCompleteMultipartUploadFails.incr(); + } + public void incNumGetServiceLists() { numGetServiceLists.incr(); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 4fc0813e2f3..941b80c7757 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -63,6 +63,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.ServiceInfo; @@ -1645,6 +1647,31 @@ public OmMultipartCommitUploadPartInfo commitMultipartUploadPart( return commitUploadPartInfo; } + @Override + public OmMultipartUploadCompleteInfo completeMultipartUpload( + OmKeyArgs omKeyArgs, OmMultipartUploadList multipartUploadList) + throws IOException { + OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo; + metrics.incNumCompleteMultipartUploads(); + + Map auditMap = (omKeyArgs == null) ? new LinkedHashMap<>() : + omKeyArgs.toAuditMap(); + auditMap.put(OzoneConsts.MULTIPART_LIST, multipartUploadList + .getMultipartMap().toString()); + try { + omMultipartUploadCompleteInfo = keyManager.completeMultipartUpload( + omKeyArgs, multipartUploadList); + AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction + .COMPLETE_MULTIPART_UPLOAD, auditMap)); + return omMultipartUploadCompleteInfo; + } catch (IOException ex) { + metrics.incNumCompleteMultipartUploadFails(); + AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction + .COMPLETE_MULTIPART_UPLOAD, auditMap, ex)); + throw ex; + } + } + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java index 6d93a787051..58f75315c66 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java @@ -119,6 +119,10 @@ public enum ResultCodes { INITIATE_MULTIPART_UPLOAD_FAILED, NO_SUCH_MULTIPART_UPLOAD, UPLOAD_PART_FAILED, + MISMATCH_MULTIPART_LIST, + MISSING_UPLOAD_PARTS, + COMPLETE_MULTIPART_UPLOAD_FAILED, + ENTITY_TOO_SMALL, INVALID_REQUEST; } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 33453ac892f..de88bc61c07 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -28,6 +28,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.ServiceInfo; @@ -100,6 +102,10 @@ .MultipartCommitUploadPartRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .MultipartCommitUploadPartResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartUploadCompleteRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartUploadCompleteResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .MultipartInfoInitiateRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -108,6 +114,7 @@ .OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Part; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .RenameKeyRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -149,6 +156,7 @@ import java.io.IOException; import java.util.List; +import java.util.TreeMap; import java.util.stream.Collectors; /** @@ -331,6 +339,13 @@ private OMResponse submitRequestToOM(OMRequest request) responseBuilder.setCommitMultiPartUploadResponse( commitUploadPartResponse); break; + case CompleteMultiPartUpload: + MultipartUploadCompleteResponse completeMultipartUploadResponse = + completeMultipartUpload( + request.getCompleteMultiPartUploadRequest()); + responseBuilder.setCompleteMultiPartUploadResponse( + completeMultipartUploadResponse); + break; case ServiceList: ServiceListResponse serviceListResponse = getServiceList( request.getServiceListRequest()); @@ -392,7 +407,14 @@ private Status exceptionToResponseStatus(IOException ex) { return Status.NO_SUCH_MULTIPART_UPLOAD_ERROR; case UPLOAD_PART_FAILED: return Status.MULTIPART_UPLOAD_PARTFILE_ERROR; - + case COMPLETE_MULTIPART_UPLOAD_FAILED: + return Status.COMPLETE_MULTIPART_UPLOAD_ERROR; + case MISMATCH_MULTIPART_LIST: + return Status.MISMATCH_MULTIPART_LIST; + case MISSING_UPLOAD_PARTS: + return Status.MISSING_UPLOAD_PARTS; + case ENTITY_TOO_SMALL: + return Status.ENTITY_TOO_SMALL; default: return Status.INTERNAL_ERROR; } @@ -839,6 +861,10 @@ private MultipartCommitUploadPartResponse commitMultipartUploadPart( .setMultipartUploadID(keyArgs.getMultipartUploadID()) .setIsMultipartKey(keyArgs.getIsMultipartKey()) .setMultipartUploadPartNumber(keyArgs.getMultipartNumber()) + .setDataSize(keyArgs.getDataSize()) + .setLocationInfoList(keyArgs.getKeyLocationsList().stream() + .map(OmKeyLocationInfo::getFromProtobuf) + .collect(Collectors.toList())) .build(); OmMultipartCommitUploadPartInfo commitUploadPartInfo = impl.commitMultipartUploadPart(omKeyArgs, request.getClientID()); @@ -849,4 +875,42 @@ private MultipartCommitUploadPartResponse commitMultipartUploadPart( } return resp.build(); } + + + private MultipartUploadCompleteResponse completeMultipartUpload( + MultipartUploadCompleteRequest request) { + MultipartUploadCompleteResponse.Builder response = + MultipartUploadCompleteResponse.newBuilder(); + + try { + KeyArgs keyArgs = request.getKeyArgs(); + List partsList = request.getPartsListList(); + + TreeMap partsMap = new TreeMap<>(); + for (Part part : partsList) { + partsMap.put(part.getPartNumber(), part.getPartName()); + } + + OmMultipartUploadList omMultipartUploadList = + new OmMultipartUploadList(partsMap); + + OmKeyArgs omKeyArgs = new OmKeyArgs.Builder() + .setVolumeName(keyArgs.getVolumeName()) + .setBucketName(keyArgs.getBucketName()) + .setKeyName(keyArgs.getKeyName()) + .setMultipartUploadID(keyArgs.getMultipartUploadID()) + .build(); + OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo = impl + .completeMultipartUpload(omKeyArgs, omMultipartUploadList); + + response.setVolume(omMultipartUploadCompleteInfo.getVolume()) + .setBucket(omMultipartUploadCompleteInfo.getBucket()) + .setKey(omMultipartUploadCompleteInfo.getKey()) + .setHash(omMultipartUploadCompleteInfo.getHash()); + response.setStatus(Status.OK); + } catch (IOException ex) { + response.setStatus(exceptionToResponseStatus(ex)); + } + return response.build(); + } }