HDDS-1736. Cleanup 2phase old HA code for Key requests. (#1038)

This commit is contained in:
Bharat Viswanadham 2019-07-15 21:51:59 -07:00 committed by GitHub
parent f77d54c243
commit 395cb3cfd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 10 additions and 283 deletions

View File

@ -221,7 +221,6 @@ public final class OmUtils {
case GetDelegationToken:
case RenewDelegationToken:
case CancelDelegationToken:
case ApplyCreateKey:
case ApplyInitiateMultiPartUpload:
case CreateDirectory:
case CreateFile:

View File

@ -24,9 +24,7 @@ public enum OMAction implements AuditAction {
// WRITE Actions
ALLOCATE_BLOCK,
ADD_ALLOCATE_BLOCK,
ALLOCATE_KEY,
APPLY_ALLOCATE_KEY,
COMMIT_KEY,
CREATE_VOLUME,
CREATE_BUCKET,

View File

@ -20,16 +20,9 @@ package org.apache.hadoop.ozone.om.protocol;
import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyLocation;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.VolumeList;
@ -49,39 +42,6 @@ public interface OzoneManagerHAProtocol {
*/
long saveRatisSnapshot() throws IOException;
/**
* Add a allocate block, it is assumed that the client is having an open
* key session going on. This block will be appended to this open key session.
* This will be called only during HA enabled OM, as during HA we get an
* allocated Block information, and add that information to OM DB.
*
* In HA the flow for allocateBlock is in StartTransaction allocateBlock
* will be called which returns block information, and in the
* applyTransaction addAllocateBlock will be called to add the block
* information to DB.
*
* @param args the key to append
* @param clientID the client identification
* @param keyLocation key location given by allocateBlock
* @return an allocated block
* @throws IOException
*/
OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
KeyLocation keyLocation) throws IOException;
/**
* Add the openKey entry with given keyInfo and clientID in to openKeyTable.
* This will be called only from applyTransaction, once after calling
* applyKey in startTransaction.
*
* @param omKeyArgs
* @param keyInfo
* @param clientID
* @throws IOException
*/
void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID)
throws IOException;
/**
* Initiate multipart upload for the specified key.
*

View File

@ -60,7 +60,6 @@ enum Type {
ListKeys = 35;
CommitKey = 36;
AllocateBlock = 37;
ApplyCreateKey = 38;
CreateS3Bucket = 41;
DeleteS3Bucket = 42;
@ -125,7 +124,6 @@ message OMRequest {
optional ListKeysRequest listKeysRequest = 35;
optional CommitKeyRequest commitKeyRequest = 36;
optional AllocateBlockRequest allocateBlockRequest = 37;
optional ApplyCreateKeyRequest applyCreateKeyRequest = 38;
optional S3CreateBucketRequest createS3BucketRequest = 41;
@ -722,11 +720,6 @@ message CreateKeyResponse {
optional uint64 openVersion = 4;
}
message ApplyCreateKeyRequest {
required CreateKeyRequest createKeyRequest = 1;
required CreateKeyResponse createKeyResponse = 2;
}
message LookupKeyRequest {
required KeyArgs keyArgs = 1;
}

View File

@ -29,12 +29,6 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.fs.OzoneManagerFS;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyLocation;
import org.apache.hadoop.utils.BackgroundService;
import java.io.IOException;
@ -83,19 +77,6 @@ public interface KeyManager extends OzoneManagerFS, IOzoneAcl {
OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
ExcludeList excludeList) throws IOException;
/**
* Ozone manager state machine call's this on an open key, to add allocated
* block to the tail of current block list of the open client.
*
* @param args the key to append
* @param clientID the client requesting block.
* @param keyLocation key location.
* @return the reference to the new block.
* @throws IOException
*/
OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
KeyLocation keyLocation) throws IOException;
/**
* Given the args of a key to put, write an open key entry to meta data.
*
@ -109,19 +90,6 @@ public interface KeyManager extends OzoneManagerFS, IOzoneAcl {
*/
OpenKeySession openKey(OmKeyArgs args) throws IOException;
/**
* Add the openKey entry with given keyInfo and clientID in to openKeyTable.
* This will be called only from applyTransaction, once after calling
* applyKey in startTransaction.
*
* @param omKeyArgs
* @param keyInfo
* @param clientID
* @throws IOException
*/
void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID)
throws IOException;
/**
* Look up an existing key. Return the info of the key to client side, which
* DistributedStorageHandler will use to access the data on datanode.

View File

@ -73,12 +73,6 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyLocation;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
import org.apache.hadoop.ozone.security.acl.OzoneObj;
@ -156,7 +150,6 @@ public class KeyManagerImpl implements KeyManager {
private final KeyProviderCryptoExtension kmsProvider;
private final boolean isRatisEnabled;
public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
@ -186,9 +179,6 @@ public class KeyManagerImpl implements KeyManager {
HDDS_BLOCK_TOKEN_ENABLED,
HDDS_BLOCK_TOKEN_ENABLED_DEFAULT);
this.kmsProvider = kmsProvider;
this.isRatisEnabled = conf.getBoolean(
OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
}
@Override
@ -266,36 +256,6 @@ public class KeyManagerImpl implements KeyManager {
}
}
@Override
public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
KeyLocation keyLocation) throws IOException {
Preconditions.checkNotNull(args);
Preconditions.checkNotNull(keyLocation);
String volumeName = args.getVolumeName();
String bucketName = args.getBucketName();
String keyName = args.getKeyName();
validateBucket(volumeName, bucketName);
String openKey = metadataManager.getOpenKey(
volumeName, bucketName, keyName, clientID);
OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get(openKey);
if (keyInfo == null) {
LOG.error("Allocate block for a key not in open status in meta store" +
" /{}/{}/{} with ID {}", volumeName, bucketName, keyName, clientID);
throw new OMException("Open Key not found",
KEY_NOT_FOUND);
}
OmKeyLocationInfo omKeyLocationInfo =
OmKeyLocationInfo.getFromProtobuf(keyLocation);
keyInfo.appendNewBlocks(Collections.singletonList(omKeyLocationInfo), true);
keyInfo.updateModifcationTime();
metadataManager.getOpenKeyTable().put(openKey, keyInfo);
return omKeyLocationInfo;
}
@Override
public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
ExcludeList excludeList) throws IOException {
@ -322,13 +282,10 @@ public class KeyManagerImpl implements KeyManager {
List<OmKeyLocationInfo> locationInfos =
allocateBlock(keyInfo, excludeList, scmBlockSize);
// If om is not managing via ratis, write to db, otherwise write to DB
// will happen via ratis apply transaction.
if (!isRatisEnabled) {
keyInfo.appendNewBlocks(locationInfos, true);
keyInfo.updateModifcationTime();
metadataManager.getOpenKeyTable().put(openKey, keyInfo);
}
keyInfo.appendNewBlocks(locationInfos, true);
keyInfo.updateModifcationTime();
metadataManager.getOpenKeyTable().put(openKey, keyInfo);
return locationInfos.get(0);
}
@ -497,11 +454,8 @@ public class KeyManagerImpl implements KeyManager {
keyInfo.appendNewBlocks(locationInfos, true);
}
// When OM is not managed via ratis we should write in to Om db in
// openKey call.
if (!isRatisEnabled) {
metadataManager.getOpenKeyTable().put(openKey, keyInfo);
}
metadataManager.getOpenKeyTable().put(openKey, keyInfo);
}
private OmKeyInfo prepareKeyInfo(
@ -555,41 +509,6 @@ public class KeyManagerImpl implements KeyManager {
return createKeyInfo(args, locations, factor, type, size, encInfo);
}
public void applyOpenKey(KeyArgs omKeyArgs,
KeyInfo keyInfo, long clientID) throws IOException {
Preconditions.checkNotNull(omKeyArgs);
String volumeName = omKeyArgs.getVolumeName();
String bucketName = omKeyArgs.getBucketName();
// Do we need to call again validateBucket, as this is just called after
// start Transaction from applyTransaction. Can we remove this double
// check?
validateBucket(volumeName, bucketName);
metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
String keyName = omKeyArgs.getKeyName();
// TODO: here if on OM machines clocks are skewed and there is a chance
// for override of the openKey entries.
try {
String openKey = metadataManager.getOpenKey(
volumeName, bucketName, keyName, clientID);
OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo);
metadataManager.getOpenKeyTable().put(openKey,
omKeyInfo);
} catch (IOException ex) {
LOG.error("Apply Open Key failed for volume:{} bucket:{} key:{}",
volumeName, bucketName, keyName, ex);
throw new OMException(ex.getMessage(),
ResultCodes.KEY_ALLOCATION_ERROR);
} finally {
metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
bucketName);
}
}
/**
* Create OmKeyInfo object.
* @param keyArgs

View File

@ -61,7 +61,6 @@ public class OMMetrics {
private @Metric MutableCounterLong numVolumeLists;
private @Metric MutableCounterLong numKeyCommits;
private @Metric MutableCounterLong numAllocateBlockCalls;
private @Metric MutableCounterLong numAddAllocateBlockCalls;
private @Metric MutableCounterLong numGetServiceLists;
private @Metric MutableCounterLong numListS3Buckets;
private @Metric MutableCounterLong numInitiateMultipartUploads;
@ -443,14 +442,6 @@ public class OMMetrics {
numBlockAllocateCallFails.incr();
}
public void incNumAddAllocateBlockCalls() {
numAddAllocateBlockCalls.incr();
}
public void incNumAddAllocateBlockFails() {
numAddAllocateBlockCallFails.incr();
}
public void incNumBucketListFails() {
numBucketListFails.incr();
}

View File

@ -84,10 +84,6 @@ import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
import org.apache.hadoop.ozone.om.snapshot.OzoneManagerSnapshotProvider;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyLocation;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.VolumeList;
import org.apache.hadoop.ozone.security.OzoneSecurityException;
@ -2233,34 +2229,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
}
}
@Override
public void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID)
throws IOException {
// Do we need to check again Acl's for apply OpenKey call?
if(isAclEnabled) {
checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
omKeyArgs.getVolumeName(), omKeyArgs.getBucketName(),
omKeyArgs.getKeyName());
}
boolean auditSuccess = true;
try {
keyManager.applyOpenKey(omKeyArgs, keyInfo, clientID);
} catch (Exception ex) {
metrics.incNumKeyAllocateFails();
auditSuccess = false;
AUDIT.logWriteFailure(buildAuditMessageForFailure(
OMAction.APPLY_ALLOCATE_KEY,
(omKeyArgs == null) ? null : toAuditMap(omKeyArgs), ex));
throw ex;
} finally {
if(auditSuccess){
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
OMAction.APPLY_ALLOCATE_KEY, (omKeyArgs == null) ? null :
toAuditMap(omKeyArgs)));
}
}
}
private Map<String, String> toAuditMap(KeyArgs omKeyArgs) {
Map<String, String> auditMap = new LinkedHashMap<>();
auditMap.put(OzoneConsts.VOLUME, omKeyArgs.getVolumeName());
@ -2361,34 +2329,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
}
}
@Override
public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
KeyLocation keyLocation) throws IOException {
if(isAclEnabled) {
checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
args.getVolumeName(), args.getBucketName(), args.getKeyName());
}
boolean auditSuccess = true;
Map<String, String> auditMap = (args == null) ? new LinkedHashMap<>() :
args.toAuditMap();
auditMap.put(OzoneConsts.CLIENT_ID, String.valueOf(clientID));
try {
metrics.incNumAddAllocateBlockCalls();
return keyManager.addAllocatedBlock(args, clientID, keyLocation);
} catch (Exception ex) {
metrics.incNumAddAllocateBlockFails();
auditSuccess = false;
AUDIT.logWriteFailure(buildAuditMessageForFailure(
OMAction.ADD_ALLOCATE_BLOCK, auditMap, ex));
throw ex;
} finally {
if(auditSuccess){
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
OMAction.ADD_ALLOCATE_BLOCK, auditMap));
}
}
}
/**
* Lookup a key.
*

View File

@ -57,8 +57,6 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.ozone.om.exceptions.OMException.STATUS_CODE;
/**
* The OM StateMachine is the state machine for OM Ratis server. It is
* responsible for applying ratis committed transactions to
@ -277,17 +275,6 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
.build();
}
/**
* Construct IOException message for failed requests in StartTransaction.
* @param omResponse
* @return
*/
private IOException constructExceptionForFailedRequest(
OMResponse omResponse) {
return new IOException(omResponse.getMessage() + " " +
STATUS_CODE + omResponse.getStatus());
}
/**
* Submits write request to OM and returns the response Message.
* @param request OMRequest

View File

@ -154,7 +154,7 @@ public class OMAllocateBlockRequest extends OMKeyRequest {
long clientID = allocateBlockRequest.getClientID();
OMMetrics omMetrics = ozoneManager.getMetrics();
omMetrics.incNumAddAllocateBlockCalls();
omMetrics.incNumBlockAllocateCalls();
AuditLogger auditLogger = ozoneManager.getAuditLogger();

View File

@ -49,8 +49,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFile
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ApplyCreateKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoApplyInitiateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelDelegationTokenResponseProto;
@ -215,11 +213,6 @@ public class OzoneManagerRequestHandler implements RequestHandler {
request.getCreateKeyRequest());
responseBuilder.setCreateKeyResponse(createKeyResponse);
break;
case ApplyCreateKey:
CreateKeyResponse applyKeyResponse =
applyCreateKey(request.getApplyCreateKeyRequest());
responseBuilder.setCreateKeyResponse(applyKeyResponse);
break;
case LookupKey:
LookupKeyResponse lookupKeyResponse = lookupKey(
request.getLookupKeyRequest());
@ -589,20 +582,6 @@ public class OzoneManagerRequestHandler implements RequestHandler {
return resp.build();
}
private CreateKeyResponse applyCreateKey(ApplyCreateKeyRequest request)
throws IOException {
CreateKeyRequest createKeyRequest = request.getCreateKeyRequest();
CreateKeyResponse createKeyResponse = request.getCreateKeyResponse();
impl.applyOpenKey(createKeyRequest.getKeyArgs(),
createKeyResponse.getKeyInfo(), createKeyResponse.getID());
// If applying to om DB successful just return createKeyResponse.
return createKeyResponse;
}
private LookupKeyResponse lookupKey(LookupKeyRequest request)
throws IOException {
LookupKeyResponse.Builder resp =
@ -744,16 +723,9 @@ public class OzoneManagerRequestHandler implements RequestHandler {
.setKeyName(keyArgs.getKeyName())
.build();
OmKeyLocationInfo newLocation;
if (request.hasKeyLocation()) {
newLocation =
impl.addAllocatedBlock(omKeyArgs, request.getClientID(),
request.getKeyLocation());
} else {
newLocation =
impl.allocateBlock(omKeyArgs, request.getClientID(),
ExcludeList.getFromProtoBuf(request.getExcludeList()));
}
OmKeyLocationInfo newLocation = impl.allocateBlock(omKeyArgs,
request.getClientID(), ExcludeList.getFromProtoBuf(
request.getExcludeList()));
resp.setKeyLocation(newLocation.getProtobuf());