diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java index c86f27a6ef9..84a6530f37c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java @@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.om.request.key.OMKeyRenameRequest; import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketCreateRequest; import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketDeleteRequest; import org.apache.hadoop.ozone.om.request.s3.multipart.S3InitiateMultipartUploadRequest; +import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequest; import org.apache.hadoop.ozone.om.request.volume.OMVolumeCreateRequest; import org.apache.hadoop.ozone.om.request.volume.OMVolumeDeleteRequest; import org.apache.hadoop.ozone.om.request.volume.OMVolumeSetOwnerRequest; @@ -108,6 +109,8 @@ public final class OzoneManagerRatisUtils { return new S3BucketDeleteRequest(omRequest); case InitiateMultiPartUpload: return new S3InitiateMultipartUploadRequest(omRequest); + case CommitMultiPartUpload: + return new S3MultipartUploadCommitPartRequest(omRequest); default: // TODO: will update once all request types are implemented. return null; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java new file mode 100644 index 00000000000..fc3daece5ec --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.request.s3.multipart; + +import com.google.common.base.Optional; +import org.apache.hadoop.ozone.audit.OMAction; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.request.key.OMKeyRequest; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.om.response.s3.multipart + .S3MultipartUploadCommitPartResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartCommitUploadPartRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartCommitUploadPartResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; +import org.apache.hadoop.ozone.security.acl.OzoneObj; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.db.cache.CacheKey; +import org.apache.hadoop.utils.db.cache.CacheValue; + +import java.io.IOException; +import java.util.stream.Collectors; + +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND; +import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK; + +/** + * Handle Multipart upload commit upload part file. + */ +public class S3MultipartUploadCommitPartRequest extends OMKeyRequest { + + public S3MultipartUploadCommitPartRequest(OMRequest omRequest) { + super(omRequest); + } + + @Override + public OMRequest preExecute(OzoneManager ozoneManager) { + MultipartCommitUploadPartRequest multipartCommitUploadPartRequest = + getOmRequest().getCommitMultiPartUploadRequest(); + + return getOmRequest().toBuilder().setCommitMultiPartUploadRequest( + multipartCommitUploadPartRequest.toBuilder() + .setKeyArgs(multipartCommitUploadPartRequest.getKeyArgs() + .toBuilder().setModificationTime(Time.now()))) + .setUserInfo(getUserInfo()).build(); + } + + @Override + public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, + long transactionLogIndex) { + MultipartCommitUploadPartRequest multipartCommitUploadPartRequest = + getOmRequest().getCommitMultiPartUploadRequest(); + + OzoneManagerProtocolProtos.KeyArgs keyArgs = + multipartCommitUploadPartRequest.getKeyArgs(); + + String volumeName = keyArgs.getVolumeName(); + String bucketName = keyArgs.getBucketName(); + String keyName = keyArgs.getKeyName(); + + OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager(); + ozoneManager.getMetrics().incNumCommitMultipartUploadParts(); + + boolean acquiredLock = false; + OmMultipartKeyInfo multipartKeyInfo = null; + OmKeyInfo omKeyInfo = null; + String openKey = null; + String multipartKey = null; + OzoneManagerProtocolProtos.PartKeyInfo oldPartKeyInfo = null; + IOException exception = null; + String partName = null; + try { + // check Acl + if (ozoneManager.getAclsEnabled()) { + checkAcls(ozoneManager, OzoneObj.ResourceType.KEY, + OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE, + volumeName, bucketName, keyName); + } + + acquiredLock = + omMetadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, + bucketName); + + validateBucketAndVolume(omMetadataManager, volumeName, bucketName); + + String uploadID = keyArgs.getMultipartUploadID(); + multipartKey = omMetadataManager.getMultipartKey(volumeName, bucketName, + keyName, uploadID); + + multipartKeyInfo = omMetadataManager + .getMultipartInfoTable().get(multipartKey); + + long clientID = multipartCommitUploadPartRequest.getClientID(); + + openKey = omMetadataManager.getOpenKey( + volumeName, bucketName, keyName, clientID); + + omKeyInfo = omMetadataManager.getOpenKeyTable().get(openKey); + + + if (omKeyInfo == null) { + throw new OMException("Failed to commit Multipart Upload key, as " + + openKey + "entry is not found in the openKey table", KEY_NOT_FOUND); + } + + // set the data size and location info list + omKeyInfo.setDataSize(keyArgs.getDataSize()); + omKeyInfo.updateLocationInfoList(keyArgs.getKeyLocationsList().stream() + .map(OmKeyLocationInfo::getFromProtobuf) + .collect(Collectors.toList())); + // Set Modification time + omKeyInfo.setModificationTime(keyArgs.getModificationTime()); + + partName = omMetadataManager.getOzoneKey(volumeName, bucketName, + keyName) + clientID; + + if (multipartKeyInfo == null) { + // This can occur when user started uploading part by the time commit + // of that part happens, in between the user might have requested + // abort multipart upload. If we just throw exception, then the data + // will not be garbage collected, so move this part to delete table + // and throw error + // Move this part to delete table. + throw new OMException("No such Multipart upload is with specified " + + "uploadId " + uploadID, + OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR); + } else { + int partNumber = keyArgs.getMultipartNumber(); + oldPartKeyInfo = multipartKeyInfo.getPartKeyInfo(partNumber); + + // Build this multipart upload part info. + OzoneManagerProtocolProtos.PartKeyInfo.Builder partKeyInfo = + OzoneManagerProtocolProtos.PartKeyInfo.newBuilder(); + partKeyInfo.setPartName(partName); + partKeyInfo.setPartNumber(partNumber); + partKeyInfo.setPartKeyInfo(omKeyInfo.getProtobuf()); + + // Add this part information in to multipartKeyInfo. + multipartKeyInfo.addPartKeyInfo(partNumber, partKeyInfo.build()); + + // Add to cache. + + // Delete from open key table and add it to multipart info table. + // No need to add cache entries to delete table, as no + // read/write requests that info for validation. + omMetadataManager.getMultipartInfoTable().addCacheEntry( + new CacheKey<>(multipartKey), + new CacheValue<>(Optional.of(multipartKeyInfo), + transactionLogIndex)); + + omMetadataManager.getOpenKeyTable().addCacheEntry( + new CacheKey<>(openKey), + new CacheValue<>(Optional.absent(), transactionLogIndex)); + } + + } catch (IOException ex) { + exception = ex; + } finally { + if (acquiredLock) { + omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName, + bucketName); + } + } + + // audit log + auditLog(ozoneManager.getAuditLogger(), buildAuditMessage( + OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY, buildKeyArgsAuditMap(keyArgs), + exception, getOmRequest().getUserInfo())); + + OMResponse.Builder omResponse = OMResponse.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.CommitMultiPartUpload) + .setStatus(OzoneManagerProtocolProtos.Status.OK) + .setSuccess(true); + + if (exception == null) { + omResponse.setCommitMultiPartUploadResponse( + MultipartCommitUploadPartResponse.newBuilder().setPartName(partName)); + return new S3MultipartUploadCommitPartResponse(multipartKey, openKey, + keyArgs.getModificationTime(), omKeyInfo, multipartKeyInfo, + oldPartKeyInfo, omResponse.build()); + } else { + ozoneManager.getMetrics().incNumCommitMultipartUploadPartFails(); + return new S3MultipartUploadCommitPartResponse(multipartKey, openKey, + keyArgs.getModificationTime(), omKeyInfo, multipartKeyInfo, + oldPartKeyInfo, createErrorOMResponse(omResponse, exception)); + + } + } +} + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponse.java new file mode 100644 index 00000000000..2d76a4081e5 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponse.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.response.s3.multipart; + +import org.apache.hadoop.ozone.OmUtils; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.hadoop.utils.db.BatchOperation; + +import java.io.IOException; +import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .Status.NO_SUCH_MULTIPART_UPLOAD_ERROR; +import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .Status.OK; + +/** + * Response for S3MultipartUploadCommitPart request. + */ +public class S3MultipartUploadCommitPartResponse extends OMClientResponse { + + private String multipartKey; + private String openKey; + private long deleteTimeStamp; + private OmKeyInfo deletePartKeyInfo; + private OmMultipartKeyInfo omMultipartKeyInfo; + private OzoneManagerProtocolProtos.PartKeyInfo oldMultipartKeyInfo; + + + public S3MultipartUploadCommitPartResponse(String multipartKey, + String openKey, long deleteTimeStamp, + OmKeyInfo deletePartKeyInfo, OmMultipartKeyInfo omMultipartKeyInfo, + OzoneManagerProtocolProtos.PartKeyInfo oldPartKeyInfo, + OMResponse omResponse) { + super(omResponse); + this.multipartKey = multipartKey; + this.openKey = openKey; + this.deleteTimeStamp = deleteTimeStamp; + this.deletePartKeyInfo = deletePartKeyInfo; + this.omMultipartKeyInfo = omMultipartKeyInfo; + this.oldMultipartKeyInfo = oldPartKeyInfo; + } + + + @Override + public void addToDBBatch(OMMetadataManager omMetadataManager, + BatchOperation batchOperation) throws IOException { + + + if (getOMResponse().getStatus() == NO_SUCH_MULTIPART_UPLOAD_ERROR) { + // Means by the time we try to commit part, some one has aborted this + // multipart upload. So, delete this part information. + omMetadataManager.getDeletedTable().putWithBatch(batchOperation, + OmUtils.getDeletedKeyName(openKey, deleteTimeStamp), + deletePartKeyInfo); + } + + if (getOMResponse().getStatus() == OK) { + + // If we have old part info: + // Need to do 3 steps: + // 1. add old part to delete table + // 2. Commit multipart info which has information about this new part. + // 3. delete this new part entry from open key table. + + // This means for this multipart upload part upload, we have an old + // part information, so delete it. + if (oldMultipartKeyInfo != null) { + omMetadataManager.getDeletedTable().putWithBatch(batchOperation, + OmUtils.getDeletedKeyName(oldMultipartKeyInfo.getPartName(), + deleteTimeStamp), + OmKeyInfo.getFromProtobuf(oldMultipartKeyInfo.getPartKeyInfo())); + } + + + omMetadataManager.getMultipartInfoTable().putWithBatch(batchOperation, + multipartKey, omMultipartKeyInfo); + + // This information has been added to multipartKeyInfo. So, we can + // safely delete part key info from open key table. + omMetadataManager.getOpenKeyTable().deleteWithBatch(batchOperation, + openKey); + + + } + } + +} + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java index 44f9b90e988..204394262d9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java @@ -26,8 +26,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMResponse; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .Status; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .Type; @@ -70,6 +68,7 @@ public class OzoneManagerHARequestHandlerImpl case CreateS3Bucket: case DeleteS3Bucket: case InitiateMultiPartUpload: + case CommitMultiPartUpload: //TODO: We don't need to pass transactionID, this will be removed when // complete write requests is changed to new model. And also we can // return OMClientResponse, then adding to doubleBuffer can be taken @@ -81,12 +80,12 @@ public class OzoneManagerHARequestHandlerImpl omClientRequest.validateAndUpdateCache(getOzoneManager(), transactionLogIndex); - // If any error we have got when validateAndUpdateCache, OMResponse - // Status is set with Error Code other than OK, in that case don't - // add this to double buffer. - if (omClientResponse.getOMResponse().getStatus() == Status.OK) { - ozoneManagerDoubleBuffer.add(omClientResponse, transactionLogIndex); - } + + // Add OMClient Response to double buffer. + // Each OMClient Response should handle what needs to be done in error + // case. + ozoneManagerDoubleBuffer.add(omClientResponse, transactionLogIndex); + return omClientResponse.getOMResponse(); default: // As all request types are not changed so we need to call handle diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java index 96f3faea59b..d186ad6b34f 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java @@ -35,6 +35,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketCreateRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartCommitUploadPartRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .KeyArgs; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -316,4 +318,34 @@ public final class TestOMRequestUtils { .build(); } + /** + * Create OMRequest which encapsulates InitiateMultipartUpload request. + * @param volumeName + * @param bucketName + * @param keyName + */ + public static OMRequest createCommitPartMPURequest(String volumeName, + String bucketName, String keyName, long clientID, long size, + String multipartUploadID, int partNumber) { + + // Just set dummy size. + KeyArgs.Builder keyArgs = + KeyArgs.newBuilder().setVolumeName(volumeName).setKeyName(keyName) + .setBucketName(bucketName) + .setDataSize(size) + .setMultipartNumber(partNumber) + .setMultipartUploadID(multipartUploadID) + .addAllKeyLocations(new ArrayList<>()); + // Just adding dummy list. As this is for UT only. + + MultipartCommitUploadPartRequest multipartCommitUploadPartRequest = + MultipartCommitUploadPartRequest.newBuilder() + .setKeyArgs(keyArgs).setClientID(clientID).build(); + + return OMRequest.newBuilder().setClientId(UUID.randomUUID().toString()) + .setCmdType(OzoneManagerProtocolProtos.Type.CommitMultiPartUpload) + .setCommitMultiPartUploadRequest(multipartCommitUploadPartRequest) + .build(); + } + } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java index 99d8de85a73..619293e1ebc 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java @@ -37,8 +37,8 @@ public class TestS3InitiateMultipartUploadRequest @Test public void testPreExecute() { - doPreExecute(UUID.randomUUID().toString(), UUID.randomUUID().toString(), - UUID.randomUUID().toString()); + doPreExecuteInitiateMPU(UUID.randomUUID().toString(), + UUID.randomUUID().toString(), UUID.randomUUID().toString()); } @@ -52,7 +52,8 @@ public class TestS3InitiateMultipartUploadRequest TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName, omMetadataManager); - OMRequest modifiedRequest = doPreExecute(volumeName, bucketName, keyName); + OMRequest modifiedRequest = doPreExecuteInitiateMPU(volumeName, + bucketName, keyName); S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest = new S3InitiateMultipartUploadRequest(modifiedRequest); @@ -97,7 +98,8 @@ public class TestS3InitiateMultipartUploadRequest TestOMRequestUtils.addVolumeToDB(volumeName, omMetadataManager); - OMRequest modifiedRequest = doPreExecute(volumeName, bucketName, keyName); + OMRequest modifiedRequest = doPreExecuteInitiateMPU( + volumeName, bucketName, keyName); S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest = new S3InitiateMultipartUploadRequest(modifiedRequest); @@ -126,7 +128,8 @@ public class TestS3InitiateMultipartUploadRequest String keyName = UUID.randomUUID().toString(); - OMRequest modifiedRequest = doPreExecute(volumeName, bucketName, keyName); + OMRequest modifiedRequest = doPreExecuteInitiateMPU(volumeName, bucketName, + keyName); S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest = new S3InitiateMultipartUploadRequest(modifiedRequest); @@ -147,28 +150,4 @@ public class TestS3InitiateMultipartUploadRequest .get(multipartKey)); } - - - - private OMRequest doPreExecute(String volumeName, String bucketName, - String keyName) { - OMRequest omRequest = - TestOMRequestUtils.createInitiateMPURequest(volumeName, bucketName, - keyName); - - S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest = - new S3InitiateMultipartUploadRequest(omRequest); - - OMRequest modifiedRequest = - s3InitiateMultipartUploadRequest.preExecute(ozoneManager); - - Assert.assertNotEquals(omRequest, modifiedRequest); - Assert.assertTrue(modifiedRequest.hasInitiateMultiPartUploadRequest()); - Assert.assertNotNull(modifiedRequest.getInitiateMultiPartUploadRequest() - .getKeyArgs().getMultipartUploadID()); - Assert.assertTrue(modifiedRequest.getInitiateMultiPartUploadRequest() - .getKeyArgs().getModificationTime() > 0); - - return modifiedRequest; - } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java index 15b642bad3f..d53235a486d 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java @@ -19,7 +19,11 @@ package org.apache.hadoop.ozone.om.request.s3.multipart; +import org.apache.hadoop.ozone.om.request.TestOMRequestUtils; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.rules.TemporaryFolder; @@ -73,4 +77,68 @@ public class TestS3MultipartRequest { Mockito.framework().clearInlineMocks(); } + /** + * Perform preExecute of Initiate Multipart upload request for given + * volume, bucket and key name. + * @param volumeName + * @param bucketName + * @param keyName + * @return OMRequest - returned from preExecute. + */ + protected OMRequest doPreExecuteInitiateMPU( + String volumeName, String bucketName, String keyName) { + OMRequest omRequest = + TestOMRequestUtils.createInitiateMPURequest(volumeName, bucketName, + keyName); + + S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest = + new S3InitiateMultipartUploadRequest(omRequest); + + OMRequest modifiedRequest = + s3InitiateMultipartUploadRequest.preExecute(ozoneManager); + + Assert.assertNotEquals(omRequest, modifiedRequest); + Assert.assertTrue(modifiedRequest.hasInitiateMultiPartUploadRequest()); + Assert.assertNotNull(modifiedRequest.getInitiateMultiPartUploadRequest() + .getKeyArgs().getMultipartUploadID()); + Assert.assertTrue(modifiedRequest.getInitiateMultiPartUploadRequest() + .getKeyArgs().getModificationTime() > 0); + + return modifiedRequest; + } + + /** + * Perform preExecute of Commit Multipart Upload request for given volume, + * bucket and keyName. + * @param volumeName + * @param bucketName + * @param keyName + * @param clientID + * @param multipartUploadID + * @param partNumber + * @return OMRequest - returned from preExecute. + */ + protected OMRequest doPreExecuteCommitMPU( + String volumeName, String bucketName, String keyName, + long clientID, String multipartUploadID, int partNumber) { + + // Just set dummy size + long dataSize = 100L; + OMRequest omRequest = + TestOMRequestUtils.createCommitPartMPURequest(volumeName, bucketName, + keyName, clientID, dataSize, multipartUploadID, partNumber); + S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest = + new S3MultipartUploadCommitPartRequest(omRequest); + + + OMRequest modifiedRequest = + s3MultipartUploadCommitPartRequest.preExecute(ozoneManager); + + // UserInfo and modification time is set. + Assert.assertNotEquals(omRequest, modifiedRequest); + + return modifiedRequest; + } + + } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequest.java new file mode 100644 index 00000000000..19d985de7c9 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.ozone.om.request.s3.multipart; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.om.request.TestOMRequestUtils; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.util.Time; +import org.junit.Assert; +import org.junit.Test; + +import java.util.UUID; + +/** + * Tests S3 Multipart upload commit part request. + */ +public class TestS3MultipartUploadCommitPartRequest + extends TestS3MultipartRequest { + + @Test + public void testPreExecute() { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + doPreExecuteCommitMPU(volumeName, bucketName, keyName, Time.now(), + UUID.randomUUID().toString(), 1); + } + + + @Test + public void testValidateAndUpdateCacheSuccess() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager); + + OMRequest initiateMPURequest = doPreExecuteInitiateMPU(volumeName, + bucketName, keyName); + + S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest = + new S3InitiateMultipartUploadRequest(initiateMPURequest); + + OMClientResponse omClientResponse = + s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager, + 1L); + + long clientID = Time.now(); + String multipartUploadID = omClientResponse.getOMResponse() + .getInitiateMultiPartUploadResponse().getMultipartUploadID(); + + OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName, + bucketName, keyName, clientID, multipartUploadID, 1); + + S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest = + new S3MultipartUploadCommitPartRequest(commitMultipartRequest); + + // Add key to open key table. + TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName, + keyName, clientID, HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.ONE, omMetadataManager); + + omClientResponse = + s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager, + 2L); + + + Assert.assertTrue(omClientResponse.getOMResponse().getStatus() + == OzoneManagerProtocolProtos.Status.OK); + + String multipartKey = omMetadataManager.getMultipartKey(volumeName, + bucketName, keyName, multipartUploadID); + + Assert.assertNotNull( + omMetadataManager.getMultipartInfoTable().get(multipartKey)); + Assert.assertTrue(omMetadataManager.getMultipartInfoTable() + .get(multipartKey).getPartKeyInfoMap().size() == 1); + Assert.assertNull(omMetadataManager.getOpenKeyTable() + .get(omMetadataManager.getOpenKey(volumeName, bucketName, keyName, + clientID))); + + } + + @Test + public void testValidateAndUpdateCacheMultipartNotFound() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager); + + + long clientID = Time.now(); + String multipartUploadID = UUID.randomUUID().toString(); + + OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName, + bucketName, keyName, clientID, multipartUploadID, 1); + + S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest = + new S3MultipartUploadCommitPartRequest(commitMultipartRequest); + + // Add key to open key table. + TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName, + keyName, clientID, HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.ONE, omMetadataManager); + + OMClientResponse omClientResponse = + s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager, + 2L); + + + Assert.assertTrue(omClientResponse.getOMResponse().getStatus() + == OzoneManagerProtocolProtos.Status.NO_SUCH_MULTIPART_UPLOAD_ERROR); + + String multipartKey = omMetadataManager.getMultipartKey(volumeName, + bucketName, keyName, multipartUploadID); + + Assert.assertNull( + omMetadataManager.getMultipartInfoTable().get(multipartKey)); + + } + + @Test + public void testValidateAndUpdateCacheKeyNotFound() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager); + + + long clientID = Time.now(); + String multipartUploadID = UUID.randomUUID().toString(); + + OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName, + bucketName, keyName, clientID, multipartUploadID, 1); + + // Don't add key to open table entry, and we are trying to commit this MPU + // part. It will fail with KEY_NOT_FOUND + + S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest = + new S3MultipartUploadCommitPartRequest(commitMultipartRequest); + + + OMClientResponse omClientResponse = + s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager, + 2L); + + Assert.assertTrue(omClientResponse.getOMResponse().getStatus() + == OzoneManagerProtocolProtos.Status.KEY_NOT_FOUND); + + } + + + @Test + public void testValidateAndUpdateCacheBucketFound() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + TestOMRequestUtils.addVolumeToDB(volumeName, omMetadataManager); + + + long clientID = Time.now(); + String multipartUploadID = UUID.randomUUID().toString(); + + OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName, + bucketName, keyName, clientID, multipartUploadID, 1); + + // Don't add key to open table entry, and we are trying to commit this MPU + // part. It will fail with KEY_NOT_FOUND + + S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest = + new S3MultipartUploadCommitPartRequest(commitMultipartRequest); + + + OMClientResponse omClientResponse = + s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager, + 2L); + + Assert.assertTrue(omClientResponse.getOMResponse().getStatus() + == OzoneManagerProtocolProtos.Status.BUCKET_NOT_FOUND); + + } +}