HDDS-1805. Implement S3 Initiate MPU request to use Cache and DoubleBuffer. (#1108)

This commit is contained in:
Bharat Viswanadham 2019-07-20 08:11:36 -07:00 committed by GitHub
parent e60f5e2572
commit 4aa76e327d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 813 additions and 145 deletions

View File

@ -221,7 +221,6 @@ public static boolean isReadOnly(
case GetDelegationToken:
case RenewDelegationToken:
case CancelDelegationToken:
case ApplyInitiateMultiPartUpload:
case CreateDirectory:
case CreateFile:
case RemoveAcl:

View File

@ -18,9 +18,6 @@
package org.apache.hadoop.ozone.om.protocol;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import java.io.IOException;
/**
@ -37,16 +34,4 @@ public interface OzoneManagerHAProtocol {
*/
long saveRatisSnapshot() throws IOException;
/**
* Initiate multipart upload for the specified key.
*
* This will be called only from applyTransaction.
* @param omKeyArgs
* @param multipartUploadID
* @return OmMultipartInfo
* @throws IOException
*/
OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs omKeyArgs,
String multipartUploadID) throws IOException;
}

View File

@ -74,8 +74,6 @@ enum Type {
ServiceList = 51;
ApplyInitiateMultiPartUpload = 52;
GetDelegationToken = 61;
RenewDelegationToken = 62;
CancelDelegationToken = 63;
@ -138,7 +136,6 @@ message OMRequest {
optional MultipartUploadListPartsRequest listMultipartUploadPartsRequest = 50;
optional ServiceListRequest serviceListRequest = 51;
optional MultipartInfoApplyInitiateRequest initiateMultiPartUploadApplyRequest = 52;
optional hadoop.common.GetDelegationTokenRequestProto getDelegationTokenRequest = 61;
optional hadoop.common.RenewDelegationTokenRequestProto renewDelegationTokenRequest= 62;
@ -909,11 +906,7 @@ message S3ListBucketsResponse {
message MultipartInfoInitiateRequest {
required KeyArgs keyArgs = 1;
}
message MultipartInfoApplyInitiateRequest {
required KeyArgs keyArgs = 1;
required string multipartUploadID = 2;
}
message MultipartInfoInitiateResponse {

View File

@ -201,17 +201,6 @@ List<OmKeyInfo> listKeys(String volumeName,
*/
OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException;
/**
* Initiate multipart upload for the specified key.
*
* @param keyArgs
* @param multipartUploadID
* @return MultipartInfo
* @throws IOException
*/
OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs keyArgs,
String multipartUploadID) throws IOException;
/**
* Commit Multipart upload part file.
* @param omKeyArgs

View File

@ -874,15 +874,13 @@ public BackgroundService getDeletingService() {
@Override
public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws
IOException {
long time = Time.monotonicNowNanos();
String uploadID = UUID.randomUUID().toString() + "-" + time;
return applyInitiateMultipartUpload(omKeyArgs, uploadID);
Preconditions.checkNotNull(omKeyArgs);
String uploadID = UUID.randomUUID().toString() + "-" + UniqueId.next();
return createMultipartInfo(omKeyArgs, uploadID);
}
public OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs keyArgs,
private OmMultipartInfo createMultipartInfo(OmKeyArgs keyArgs,
String multipartUploadID) throws IOException {
Preconditions.checkNotNull(keyArgs);
Preconditions.checkNotNull(multipartUploadID);
String volumeName = keyArgs.getVolumeName();
String bucketName = keyArgs.getBucketName();
String keyName = keyArgs.getKeyName();

View File

@ -2698,28 +2698,6 @@ public List<OmBucketInfo> listS3Buckets(String userName, String startKey,
}
}
@Override
public OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs keyArgs,
String multipartUploadID) throws IOException {
OmMultipartInfo multipartInfo;
metrics.incNumInitiateMultipartUploads();
try {
multipartInfo = keyManager.applyInitiateMultipartUpload(keyArgs,
multipartUploadID);
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
OMAction.INITIATE_MULTIPART_UPLOAD, (keyArgs == null) ? null :
keyArgs.toAuditMap()));
} catch (IOException ex) {
AUDIT.logWriteFailure(buildAuditMessageForFailure(
OMAction.INITIATE_MULTIPART_UPLOAD,
(keyArgs == null) ? null : keyArgs.toAuditMap(), ex));
metrics.incNumInitiateMultipartUploadFails();
throw ex;
}
return multipartInfo;
}
@Override
public OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws
IOException {

View File

@ -23,7 +23,6 @@
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
@ -33,16 +32,12 @@
.ContainerStateMachine;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoApplyInitiateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandler;
import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandlerImpl;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
@ -225,53 +220,11 @@ public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
private TransactionContext handleStartTransactionRequests(
RaftClientRequest raftClientRequest, OMRequest omRequest) {
switch (omRequest.getCmdType()) {
case InitiateMultiPartUpload:
return handleInitiateMultipartUpload(raftClientRequest, omRequest);
default:
return TransactionContext.newBuilder()
.setClientRequest(raftClientRequest)
.setStateMachine(this)
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
.setLogData(raftClientRequest.getMessage().getContent())
.build();
}
}
private TransactionContext handleInitiateMultipartUpload(
RaftClientRequest raftClientRequest, OMRequest omRequest) {
// Generate a multipart uploadID, and create a new request.
// When applyTransaction happen's all OM's use the same multipartUploadID
// for the key.
long time = Time.monotonicNowNanos();
String multipartUploadID = UUID.randomUUID().toString() + "-" + time;
MultipartInfoApplyInitiateRequest multipartInfoApplyInitiateRequest =
MultipartInfoApplyInitiateRequest.newBuilder()
.setKeyArgs(omRequest.getInitiateMultiPartUploadRequest()
.getKeyArgs()).setMultipartUploadID(multipartUploadID).build();
OMRequest.Builder newOmRequest =
OMRequest.newBuilder().setCmdType(
OzoneManagerProtocolProtos.Type.ApplyInitiateMultiPartUpload)
.setInitiateMultiPartUploadApplyRequest(
multipartInfoApplyInitiateRequest)
.setClientId(omRequest.getClientId());
if (omRequest.hasTraceID()) {
newOmRequest.setTraceID(omRequest.getTraceID());
}
ByteString messageContent =
ByteString.copyFrom(newOmRequest.build().toByteArray());
return TransactionContext.newBuilder()
.setClientRequest(raftClientRequest)
.setStateMachine(this)
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
.setLogData(messageContent)
.setLogData(raftClientRequest.getMessage().getContent())
.build();
}

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.ozone.om.request.key.OMKeyRenameRequest;
import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketCreateRequest;
import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketDeleteRequest;
import org.apache.hadoop.ozone.om.request.s3.multipart.S3InitiateMultipartUploadRequest;
import org.apache.hadoop.ozone.om.request.volume.OMVolumeCreateRequest;
import org.apache.hadoop.ozone.om.request.volume.OMVolumeDeleteRequest;
import org.apache.hadoop.ozone.om.request.volume.OMVolumeSetOwnerRequest;
@ -105,6 +106,8 @@ public static OMClientRequest createClientRequest(OMRequest omRequest) {
return new S3BucketCreateRequest(omRequest);
case DeleteS3Bucket:
return new S3BucketDeleteRequest(omRequest);
case InitiateMultiPartUpload:
return new S3InitiateMultipartUploadRequest(omRequest);
default:
// TODO: will update once all request types are implemented.
return null;

View File

@ -0,0 +1,213 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.request.s3.multipart;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.hadoop.ozone.audit.OMAction;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.om.response.s3.multipart.S3InitiateMultipartUploadResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartInfoInitiateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartInfoInitiateResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
import org.apache.hadoop.ozone.security.acl.OzoneObj;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.utils.UniqueId;
import org.apache.hadoop.utils.db.cache.CacheKey;
import org.apache.hadoop.utils.db.cache.CacheValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.UUID;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
/**
* Handles initiate multipart upload request.
*/
public class S3InitiateMultipartUploadRequest extends OMKeyRequest {
private static final Logger LOG =
LoggerFactory.getLogger(S3InitiateMultipartUploadRequest.class);
public S3InitiateMultipartUploadRequest(OMRequest omRequest) {
super(omRequest);
}
@Override
public OMRequest preExecute(OzoneManager ozoneManager) {
MultipartInfoInitiateRequest multipartInfoInitiateRequest =
getOmRequest().getInitiateMultiPartUploadRequest();
Preconditions.checkNotNull(multipartInfoInitiateRequest);
OzoneManagerProtocolProtos.KeyArgs.Builder newKeyArgs =
multipartInfoInitiateRequest.getKeyArgs().toBuilder()
.setMultipartUploadID(UUID.randomUUID().toString() + "-" +
UniqueId.next()).setModificationTime(Time.now());
return getOmRequest().toBuilder()
.setUserInfo(getUserInfo())
.setInitiateMultiPartUploadRequest(
multipartInfoInitiateRequest.toBuilder().setKeyArgs(newKeyArgs))
.build();
}
@Override
public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
long transactionLogIndex) {
MultipartInfoInitiateRequest multipartInfoInitiateRequest =
getOmRequest().getInitiateMultiPartUploadRequest();
OzoneManagerProtocolProtos.KeyArgs keyArgs =
multipartInfoInitiateRequest.getKeyArgs();
Preconditions.checkNotNull(keyArgs.getMultipartUploadID());
String volumeName = keyArgs.getVolumeName();
String bucketName = keyArgs.getBucketName();
String keyName = keyArgs.getKeyName();
OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
ozoneManager.getMetrics().incNumInitiateMultipartUploads();
boolean acquiredBucketLock = false;
IOException exception = null;
OmMultipartKeyInfo multipartKeyInfo = null;
OmKeyInfo omKeyInfo = null;
try {
// check Acl
if (ozoneManager.getAclsEnabled()) {
checkAcls(ozoneManager, OzoneObj.ResourceType.KEY,
OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
volumeName, bucketName, keyName);
}
acquiredBucketLock =
omMetadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName,
bucketName);
validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
// We are adding uploadId to key, because if multiple users try to
// perform multipart upload on the same key, each will try to upload, who
// ever finally commit the key, we see that key in ozone. Suppose if we
// don't add id, and use the same key /volume/bucket/key, when multiple
// users try to upload the key, we update the parts of the key's from
// multiple users to same key, and the key output can be a mix of the
// parts from multiple users.
// So on same key if multiple time multipart upload is initiated we
// store multiple entries in the openKey Table.
// Checked AWS S3, when we try to run multipart upload, each time a
// new uploadId is returned. And also even if a key exist when initiate
// multipart upload request is received, it returns multipart upload id
// for the key.
String multipartKey = omMetadataManager.getMultipartKey(volumeName,
bucketName, keyName, keyArgs.getMultipartUploadID());
// Not checking if there is an already key for this in the keyTable, as
// during final complete multipart upload we take care of this. AWS S3
// behavior is also like this, even when key exists in a bucket, user
// can still initiate MPU.
multipartKeyInfo = new OmMultipartKeyInfo(
keyArgs.getMultipartUploadID(), new HashMap<>());
omKeyInfo = new OmKeyInfo.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setCreationTime(keyArgs.getModificationTime())
.setModificationTime(keyArgs.getModificationTime())
.setReplicationType(keyArgs.getType())
.setReplicationFactor(keyArgs.getFactor())
.setOmKeyLocationInfos(Collections.singletonList(
new OmKeyLocationInfoGroup(0, new ArrayList<>())))
.setAcls(keyArgs.getAclsList())
.build();
// Add to cache
omMetadataManager.getOpenKeyTable().addCacheEntry(
new CacheKey<>(multipartKey),
new CacheValue<>(Optional.of(omKeyInfo), transactionLogIndex));
omMetadataManager.getMultipartInfoTable().addCacheEntry(
new CacheKey<>(multipartKey),
new CacheValue<>(Optional.of(multipartKeyInfo), transactionLogIndex));
} catch (IOException ex) {
exception = ex;
} finally {
if (acquiredBucketLock) {
omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
bucketName);
}
}
OMResponse.Builder omResponse = OMResponse.newBuilder()
.setCmdType(OzoneManagerProtocolProtos.Type.InitiateMultiPartUpload)
.setStatus(OzoneManagerProtocolProtos.Status.OK)
.setSuccess(true);
// audit log
auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
OMAction.INITIATE_MULTIPART_UPLOAD, buildKeyArgsAuditMap(keyArgs),
exception, getOmRequest().getUserInfo()));
if (exception == null) {
LOG.debug("S3 InitiateMultipart Upload request for Key {} in " +
"Volume/Bucket {}/{} is successfully completed", keyName,
volumeName, bucketName);
return new S3InitiateMultipartUploadResponse(multipartKeyInfo, omKeyInfo,
omResponse.setInitiateMultiPartUploadResponse(
MultipartInfoInitiateResponse.newBuilder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.setMultipartUploadID(keyArgs.getMultipartUploadID()))
.build());
} else {
ozoneManager.getMetrics().incNumInitiateMultipartUploadFails();
LOG.error("S3 InitiateMultipart Upload request for Key {} in " +
"Volume/Bucket {}/{} is failed", keyName, volumeName, bucketName,
exception);
return new S3InitiateMultipartUploadResponse(null, null,
createErrorOMResponse(omResponse, exception));
}
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/**
* Package contains classes related to S3 multipart upload requests.
*/
package org.apache.hadoop.ozone.om.request.s3.multipart;

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.ozone.om.response.s3.multipart;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.utils.db.BatchOperation;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
/**
* Response for S3 Initiate Multipart Upload request.
*/
public class S3InitiateMultipartUploadResponse extends OMClientResponse {
private OmMultipartKeyInfo omMultipartKeyInfo;
private OmKeyInfo omKeyInfo;
public S3InitiateMultipartUploadResponse(
@Nullable OmMultipartKeyInfo omMultipartKeyInfo,
@Nullable OmKeyInfo omKeyInfo,
@Nonnull OzoneManagerProtocolProtos.OMResponse omResponse) {
super(omResponse);
this.omMultipartKeyInfo = omMultipartKeyInfo;
this.omKeyInfo = omKeyInfo;
}
@Override
public void addToDBBatch(OMMetadataManager omMetadataManager,
BatchOperation batchOperation) throws IOException {
// For OmResponse with failure, this should do nothing. This method is
// not called in failure scenario in OM code.
if (getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) {
String multipartKey =
omMetadataManager.getMultipartKey(omKeyInfo.getVolumeName(),
omKeyInfo.getBucketName(), omKeyInfo.getKeyName(),
omMultipartKeyInfo.getUploadID());
omMetadataManager.getOpenKeyTable().putWithBatch(batchOperation,
multipartKey, omKeyInfo);
omMetadataManager.getMultipartInfoTable().putWithBatch(batchOperation,
multipartKey, omMultipartKeyInfo);
}
}
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/**
* Package contains classes related to S3 multipart upload responses.
*/
package org.apache.hadoop.ozone.om.response.s3.multipart;

View File

@ -69,6 +69,7 @@ public OMResponse handleApplyTransaction(OMRequest omRequest,
case PurgeKeys:
case CreateS3Bucket:
case DeleteS3Bucket:
case InitiateMultiPartUpload:
//TODO: We don't need to pass transactionID, this will be removed when
// complete write requests is changed to new model. And also we can
// return OMClientResponse, then adding to doubleBuffer can be taken

View File

@ -49,8 +49,6 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoApplyInitiateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelDelegationTokenResponseProto;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CheckVolumeAccessRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CheckVolumeAccessResponse;
@ -270,13 +268,6 @@ public OMResponse handle(OMRequest request) {
responseBuilder.setInitiateMultiPartUploadResponse(
multipartInfoInitiateResponse);
break;
case ApplyInitiateMultiPartUpload:
MultipartInfoInitiateResponse response =
applyInitiateMultiPartUpload(
request.getInitiateMultiPartUploadApplyRequest());
responseBuilder.setInitiateMultiPartUploadResponse(
response);
break;
case CommitMultiPartUpload:
MultipartCommitUploadPartResponse commitUploadPartResponse =
commitMultipartUploadPart(
@ -810,32 +801,6 @@ private MultipartInfoInitiateResponse initiateMultiPartUpload(
return resp.build();
}
private MultipartInfoInitiateResponse applyInitiateMultiPartUpload(
MultipartInfoApplyInitiateRequest request) throws IOException {
MultipartInfoInitiateResponse.Builder resp = MultipartInfoInitiateResponse
.newBuilder();
KeyArgs keyArgs = request.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setType(keyArgs.getType())
.setAcls(keyArgs.getAclsList().stream().map(a ->
OzoneAcl.fromProtobuf(a)).collect(Collectors.toList()))
.setFactor(keyArgs.getFactor())
.build();
OmMultipartInfo multipartInfo =
impl.applyInitiateMultipartUpload(omKeyArgs,
request.getMultipartUploadID());
resp.setVolumeName(multipartInfo.getVolumeName());
resp.setBucketName(multipartInfo.getBucketName());
resp.setKeyName(multipartInfo.getKeyName());
resp.setMultipartUploadID(multipartInfo.getUploadID());
return resp.build();
}
private MultipartCommitUploadPartResponse commitMultipartUploadPart(
MultipartCommitUploadPartRequest request) throws IOException {
MultipartCommitUploadPartResponse.Builder resp =

View File

@ -35,6 +35,10 @@
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketCreateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoInitiateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@ -292,4 +296,24 @@ public static String deleteKey(String ozoneKey,
return deletedKeyName;
}
/**
* Create OMRequest which encapsulates InitiateMultipartUpload request.
* @param volumeName
* @param bucketName
* @param keyName
*/
public static OMRequest createInitiateMPURequest(String volumeName,
String bucketName, String keyName) {
MultipartInfoInitiateRequest
multipartInfoInitiateRequest =
MultipartInfoInitiateRequest.newBuilder().setKeyArgs(
KeyArgs.newBuilder().setVolumeName(volumeName).setKeyName(keyName)
.setBucketName(bucketName)).build();
return OMRequest.newBuilder().setClientId(UUID.randomUUID().toString())
.setCmdType(OzoneManagerProtocolProtos.Type.InitiateMultiPartUpload)
.setInitiateMultiPartUploadRequest(multipartInfoInitiateRequest)
.build();
}
}

View File

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.ozone.om.request.s3.multipart;
import java.util.UUID;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
/**
* Tests S3 Initiate Multipart Upload request.
*/
public class TestS3InitiateMultipartUploadRequest
extends TestS3MultipartRequest {
@Test
public void testPreExecute() {
doPreExecute(UUID.randomUUID().toString(), UUID.randomUUID().toString(),
UUID.randomUUID().toString());
}
@Test
public void testValidateAndUpdateCache() throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
// Add volume and bucket to DB.
TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
omMetadataManager);
OMRequest modifiedRequest = doPreExecute(volumeName, bucketName, keyName);
S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
new S3InitiateMultipartUploadRequest(modifiedRequest);
OMClientResponse omClientResponse =
s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager,
100L);
Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
omClientResponse.getOMResponse().getStatus());
String multipartKey = omMetadataManager.getMultipartKey(volumeName,
bucketName, keyName, modifiedRequest.getInitiateMultiPartUploadRequest()
.getKeyArgs().getMultipartUploadID());
Assert.assertNotNull(omMetadataManager.getOpenKeyTable().get(multipartKey));
Assert.assertNotNull(omMetadataManager.getMultipartInfoTable()
.get(multipartKey));
Assert.assertEquals(modifiedRequest.getInitiateMultiPartUploadRequest()
.getKeyArgs().getMultipartUploadID(),
omMetadataManager.getMultipartInfoTable().get(multipartKey)
.getUploadID());
Assert.assertEquals(modifiedRequest.getInitiateMultiPartUploadRequest()
.getKeyArgs().getModificationTime(),
omMetadataManager.getOpenKeyTable().get(multipartKey)
.getModificationTime());
Assert.assertEquals(modifiedRequest.getInitiateMultiPartUploadRequest()
.getKeyArgs().getModificationTime(),
omMetadataManager.getOpenKeyTable().get(multipartKey)
.getCreationTime());
}
@Test
public void testValidateAndUpdateCacheWithBucketNotFound() throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
TestOMRequestUtils.addVolumeToDB(volumeName, omMetadataManager);
OMRequest modifiedRequest = doPreExecute(volumeName, bucketName, keyName);
S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
new S3InitiateMultipartUploadRequest(modifiedRequest);
OMClientResponse omClientResponse =
s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager,
100L);
Assert.assertEquals(OzoneManagerProtocolProtos.Status.BUCKET_NOT_FOUND,
omClientResponse.getOMResponse().getStatus());
String multipartKey = omMetadataManager.getMultipartKey(volumeName,
bucketName, keyName, modifiedRequest.getInitiateMultiPartUploadRequest()
.getKeyArgs().getMultipartUploadID());
Assert.assertNull(omMetadataManager.getOpenKeyTable().get(multipartKey));
Assert.assertNull(omMetadataManager.getMultipartInfoTable()
.get(multipartKey));
}
@Test
public void testValidateAndUpdateCacheWithVolumeNotFound() throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
OMRequest modifiedRequest = doPreExecute(volumeName, bucketName, keyName);
S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
new S3InitiateMultipartUploadRequest(modifiedRequest);
OMClientResponse omClientResponse =
s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager,
100L);
Assert.assertEquals(OzoneManagerProtocolProtos.Status.VOLUME_NOT_FOUND,
omClientResponse.getOMResponse().getStatus());
String multipartKey = omMetadataManager.getMultipartKey(volumeName,
bucketName, keyName, modifiedRequest.getInitiateMultiPartUploadRequest()
.getKeyArgs().getMultipartUploadID());
Assert.assertNull(omMetadataManager.getOpenKeyTable().get(multipartKey));
Assert.assertNull(omMetadataManager.getMultipartInfoTable()
.get(multipartKey));
}
private OMRequest doPreExecute(String volumeName, String bucketName,
String keyName) {
OMRequest omRequest =
TestOMRequestUtils.createInitiateMPURequest(volumeName, bucketName,
keyName);
S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
new S3InitiateMultipartUploadRequest(omRequest);
OMRequest modifiedRequest =
s3InitiateMultipartUploadRequest.preExecute(ozoneManager);
Assert.assertNotEquals(omRequest, modifiedRequest);
Assert.assertTrue(modifiedRequest.hasInitiateMultiPartUploadRequest());
Assert.assertNotNull(modifiedRequest.getInitiateMultiPartUploadRequest()
.getKeyArgs().getMultipartUploadID());
Assert.assertTrue(modifiedRequest.getInitiateMultiPartUploadRequest()
.getKeyArgs().getModificationTime() > 0);
return modifiedRequest;
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.ozone.om.request.s3.multipart;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.audit.AuditLogger;
import org.apache.hadoop.ozone.audit.AuditMessage;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OzoneManager;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
/**
* Base test class for S3 Multipart upload request.
*/
@SuppressWarnings("visibilitymodifier")
public class TestS3MultipartRequest {
@Rule
public TemporaryFolder folder = new TemporaryFolder();
protected OzoneManager ozoneManager;
protected OMMetrics omMetrics;
protected OMMetadataManager omMetadataManager;
protected AuditLogger auditLogger;
@Before
public void setup() throws Exception {
ozoneManager = Mockito.mock(OzoneManager.class);
omMetrics = OMMetrics.create();
OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
folder.newFolder().getAbsolutePath());
omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
when(ozoneManager.getMetrics()).thenReturn(omMetrics);
when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
auditLogger = Mockito.mock(AuditLogger.class);
when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
Mockito.doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
}
@After
public void stop() {
omMetrics.unRegister();
Mockito.framework().clearInlineMocks();
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/**
* Package contains test classes for S3 MPU requests.
*/
package org.apache.hadoop.ozone.om.request.s3.multipart;

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.ozone.om.response.s3.multipart;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.utils.db.BatchOperation;
/**
* Base test class for S3 MPU response.
*/
@SuppressWarnings("VisibilityModifier")
public class TestS3InitiateMultipartResponse {
@Rule
public TemporaryFolder folder = new TemporaryFolder();
protected OMMetadataManager omMetadataManager;
protected BatchOperation batchOperation;
@Before
public void setup() throws Exception {
OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
folder.newFolder().getAbsolutePath());
omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
batchOperation = omMetadataManager.getStore().initBatchOperation();
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.ozone.om.response.s3.multipart;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.UUID;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartInfoInitiateResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.util.Time;
/**
* Class tests S3 Initiate MPU response.
*/
public class TestS3InitiateMultipartUploadResponse
extends TestS3InitiateMultipartResponse {
@Test
public void addDBToBatch() throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
String multipartUploadID = UUID.randomUUID().toString();
OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo(
multipartUploadID, new HashMap<>());
OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.setCreationTime(Time.now())
.setModificationTime(Time.now())
.setReplicationType(HddsProtos.ReplicationType.RATIS)
.setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
.setOmKeyLocationInfos(Collections.singletonList(
new OmKeyLocationInfoGroup(0, new ArrayList<>())))
.build();
OMResponse omResponse = OMResponse.newBuilder()
.setCmdType(OzoneManagerProtocolProtos.Type.InitiateMultiPartUpload)
.setStatus(OzoneManagerProtocolProtos.Status.OK)
.setSuccess(true).setInitiateMultiPartUploadResponse(
MultipartInfoInitiateResponse.newBuilder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.setMultipartUploadID(multipartUploadID)).build();
S3InitiateMultipartUploadResponse s3InitiateMultipartUploadResponse =
new S3InitiateMultipartUploadResponse(multipartKeyInfo, omKeyInfo,
omResponse);
s3InitiateMultipartUploadResponse.addToDBBatch(omMetadataManager,
batchOperation);
// Do manual commit and see whether addToBatch is successful or not.
omMetadataManager.getStore().commitBatchOperation(batchOperation);
String multipartKey = omMetadataManager.getMultipartKey(volumeName,
bucketName, keyName, multipartUploadID);
Assert.assertNotNull(omMetadataManager.getOpenKeyTable().get(multipartKey));
Assert.assertNotNull(omMetadataManager.getMultipartInfoTable()
.get(multipartKey));
Assert.assertEquals(multipartUploadID,
omMetadataManager.getMultipartInfoTable().get(multipartKey)
.getUploadID());
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/**
* Package contains test classes for S3 MPU responses.
*/
package org.apache.hadoop.ozone.om.response.s3.multipart;