HDDS-1262. In OM HA OpenKey call Should happen only leader OM. (#626)

This commit is contained in:
Bharat Viswanadham 2019-03-26 21:48:01 -07:00 committed by GitHub
parent 18c57cf046
commit eef8cae7cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 624 additions and 29 deletions

View File

@ -210,6 +210,8 @@ public final class OmUtils {
case GetDelegationToken:
case RenewDelegationToken:
case CancelDelegationToken:
case ApplyCreateKey:
case ApplyInitiateMultiPartUpload:
return false;
default:
LOG.error("CmdType {} is not categorized as readOnly or not.", cmdType);

View File

@ -26,6 +26,7 @@ public enum OMAction implements AuditAction {
ALLOCATE_BLOCK,
ADD_ALLOCATE_BLOCK,
ALLOCATE_KEY,
APPLY_ALLOCATE_KEY,
COMMIT_KEY,
CREATE_VOLUME,
CREATE_BUCKET,

View File

@ -23,6 +23,8 @@ import java.io.IOException;
* Exception thrown by Ozone Manager.
*/
public class OMException extends IOException {
public static final String STATUS_CODE = "STATUS_CODE=";
private final OMException.ResultCodes result;
/**

View File

@ -20,6 +20,11 @@ package org.apache.hadoop.ozone.om.protocol;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyLocation;
@ -52,4 +57,29 @@ public interface OzoneManagerHAProtocol {
KeyLocation keyLocation) throws IOException;
/**
* Add the openKey entry with given keyInfo and clientID in to openKeyTable.
* This will be called only from applyTransaction, once after calling
* applyKey in startTransaction.
*
* @param omKeyArgs
* @param keyInfo
* @param clientID
* @throws IOException
*/
void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID)
throws IOException;
/**
* Initiate multipart upload for the specified key.
*
* This will be called only from applyTransaction.
* @param omKeyArgs
* @param multipartUploadID
* @return OmMultipartInfo
* @throws IOException
*/
OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs omKeyArgs,
String multipartUploadID) throws IOException;
}

View File

@ -60,6 +60,7 @@ enum Type {
ListKeys = 35;
CommitKey = 36;
AllocateBlock = 37;
ApplyCreateKey = 38;
CreateS3Bucket = 41;
DeleteS3Bucket = 42;
@ -74,6 +75,8 @@ enum Type {
ServiceList = 51;
ApplyInitiateMultiPartUpload = 52;
GetDelegationToken = 61;
RenewDelegationToken = 62;
CancelDelegationToken = 63;
@ -110,6 +113,8 @@ message OMRequest {
optional ListKeysRequest listKeysRequest = 35;
optional CommitKeyRequest commitKeyRequest = 36;
optional AllocateBlockRequest allocateBlockRequest = 37;
optional ApplyCreateKeyRequest applyCreateKeyRequest = 38;
optional S3CreateBucketRequest createS3BucketRequest = 41;
optional S3DeleteBucketRequest deleteS3BucketRequest = 42;
@ -123,6 +128,7 @@ message OMRequest {
optional MultipartUploadListPartsRequest listMultipartUploadPartsRequest = 50;
optional ServiceListRequest serviceListRequest = 51;
optional MultipartInfoApplyInitiateRequest initiateMultiPartUploadApplyRequest = 52;
optional hadoop.common.GetDelegationTokenRequestProto getDelegationTokenRequest = 61;
optional hadoop.common.RenewDelegationTokenRequestProto renewDelegationTokenRequest= 62;
@ -555,6 +561,11 @@ message CreateKeyResponse {
optional uint64 openVersion = 4;
}
message ApplyCreateKeyRequest {
required CreateKeyRequest createKeyRequest = 1;
required CreateKeyResponse createKeyResponse = 2;
}
message LookupKeyRequest {
required KeyArgs keyArgs = 1;
}
@ -722,6 +733,11 @@ message MultipartInfoInitiateRequest {
required KeyArgs keyArgs = 1;
}
message MultipartInfoApplyInitiateRequest {
required KeyArgs keyArgs = 1;
required string multipartUploadID = 2;
}
message MultipartInfoInitiateResponse {
required string volumeName = 1;
required string bucketName = 2;

View File

@ -18,14 +18,21 @@ package org.apache.hadoop.ozone.om;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
@ -42,7 +49,9 @@ import org.junit.rules.Timeout;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl
@ -120,6 +129,7 @@ public class TestOzoneManagerHA {
@Test
public void testAllOMNodesRunning() throws Exception {
createVolumeTest(true);
createKeyTest(true);
}
/**
@ -131,6 +141,8 @@ public class TestOzoneManagerHA {
Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
createVolumeTest(true);
createKeyTest(true);
}
/**
@ -143,8 +155,181 @@ public class TestOzoneManagerHA {
Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
createVolumeTest(false);
createKeyTest(false);
}
private OzoneBucket setupBucket() throws Exception {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
.setOwner(userName)
.setAdmin(adminName)
.build();
objectStore.createVolume(volumeName, createVolumeArgs);
OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
Assert.assertTrue(retVolumeinfo.getName().equals(volumeName));
Assert.assertTrue(retVolumeinfo.getOwner().equals(userName));
Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName));
String bucketName = UUID.randomUUID().toString();
retVolumeinfo.createBucket(bucketName);
OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
Assert.assertTrue(ozoneBucket.getName().equals(bucketName));
Assert.assertTrue(ozoneBucket.getVolumeName().equals(volumeName));
return ozoneBucket;
}
@Test
public void testMultipartUpload() throws Exception {
// Happy scenario when all OM's are up.
OzoneBucket ozoneBucket = setupBucket();
String keyName = UUID.randomUUID().toString();
String uploadID = initiateMultipartUpload(ozoneBucket, keyName);
createMultipartKeyAndReadKey(ozoneBucket, keyName, uploadID);
}
@Test
public void testMultipartUploadWithOneOmNodeDown() throws Exception {
OzoneBucket ozoneBucket = setupBucket();
String keyName = UUID.randomUUID().toString();
String uploadID = initiateMultipartUpload(ozoneBucket, keyName);
// After initiate multipartupload, shutdown leader OM.
// Stop leader OM, to see when the OM leader changes
// multipart upload is happening successfully or not.
OMFailoverProxyProvider omFailoverProxyProvider =
objectStore.getClientProxy().getOMProxyProvider();
// The OMFailoverProxyProvider will point to the current leader OM node.
String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
// Stop one of the ozone manager, to see when the OM leader changes
// multipart upload is happening successfully or not.
cluster.stopOzoneManager(leaderOMNodeId);
createMultipartKeyAndReadKey(ozoneBucket, keyName, uploadID);
String newLeaderOMNodeId =
omFailoverProxyProvider.getCurrentProxyOMNodeId();
Assert.assertTrue(leaderOMNodeId != newLeaderOMNodeId);
}
private String initiateMultipartUpload(OzoneBucket ozoneBucket,
String keyName) throws Exception {
OmMultipartInfo omMultipartInfo =
ozoneBucket.initiateMultipartUpload(keyName,
ReplicationType.RATIS,
ReplicationFactor.ONE);
String uploadID = omMultipartInfo.getUploadID();
Assert.assertTrue(uploadID != null);
return uploadID;
}
private void createMultipartKeyAndReadKey(OzoneBucket ozoneBucket,
String keyName, String uploadID) throws Exception {
String value = "random data";
OzoneOutputStream ozoneOutputStream = ozoneBucket.createMultipartKey(
keyName, value.length(), 1, uploadID);
ozoneOutputStream.write(value.getBytes(), 0, value.length());
ozoneOutputStream.close();
Map<Integer, String> partsMap = new HashMap<>();
partsMap.put(1, ozoneOutputStream.getCommitUploadPartInfo().getPartName());
OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo =
ozoneBucket.completeMultipartUpload(keyName, uploadID, partsMap);
Assert.assertTrue(omMultipartUploadCompleteInfo != null);
Assert.assertTrue(omMultipartUploadCompleteInfo.getHash() != null);
OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName);
byte[] fileContent = new byte[value.getBytes().length];
ozoneInputStream.read(fileContent);
Assert.assertEquals(value, new String(fileContent));
}
private void createKeyTest(boolean checkSuccess) throws Exception {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
.setOwner(userName)
.setAdmin(adminName)
.build();
try {
objectStore.createVolume(volumeName, createVolumeArgs);
OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
Assert.assertTrue(retVolumeinfo.getName().equals(volumeName));
Assert.assertTrue(retVolumeinfo.getOwner().equals(userName));
Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName));
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
retVolumeinfo.createBucket(bucketName);
OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
Assert.assertTrue(ozoneBucket.getName().equals(bucketName));
Assert.assertTrue(ozoneBucket.getVolumeName().equals(volumeName));
String value = "random data";
OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName,
value.length(), ReplicationType.STAND_ALONE,
ReplicationFactor.ONE, new HashMap<>());
ozoneOutputStream.write(value.getBytes(), 0, value.length());
ozoneOutputStream.close();
OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName);
byte[] fileContent = new byte[value.getBytes().length];
ozoneInputStream.read(fileContent);
Assert.assertEquals(value, new String(fileContent));
} catch (ConnectException | RemoteException e) {
if (!checkSuccess) {
// If the last OM to be tried by the RetryProxy is down, we would get
// ConnectException. Otherwise, we would get a RemoteException from the
// last running OM as it would fail to get a quorum.
if (e instanceof RemoteException) {
GenericTestUtils.assertExceptionContains(
"RaftRetryFailureException", e);
}
} else {
throw e;
}
}
}
/**
* Create a volume and test its attribute.
*/
@ -186,6 +371,8 @@ public class TestOzoneManagerHA {
}
}
/**
* Test that OMFailoverProxyProvider creates an OM proxy for each OM in the
* cluster.

View File

@ -29,7 +29,12 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.fs.OzoneManagerFS;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyLocation;
import org.apache.hadoop.utils.BackgroundService;
import java.io.IOException;
@ -89,7 +94,7 @@ public interface KeyManager extends OzoneManagerFS {
* @throws IOException
*/
OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
OzoneManagerProtocolProtos.KeyLocation keyLocation) throws IOException;
KeyLocation keyLocation) throws IOException;
/**
* Given the args of a key to put, write an open key entry to meta data.
@ -104,6 +109,19 @@ public interface KeyManager extends OzoneManagerFS {
*/
OpenKeySession openKey(OmKeyArgs args) throws IOException;
/**
* Add the openKey entry with given keyInfo and clientID in to openKeyTable.
* This will be called only from applyTransaction, once after calling
* applyKey in startTransaction.
*
* @param omKeyArgs
* @param keyInfo
* @param clientID
* @throws IOException
*/
void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID)
throws IOException;
/**
* Look up an existing key. Return the info of the key to client side, which
* DistributedStorageHandler will use to access the data on datanode.
@ -213,6 +231,17 @@ public interface KeyManager extends OzoneManagerFS {
*/
OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException;
/**
* Initiate multipart upload for the specified key.
*
* @param keyArgs
* @param multipartUploadID
* @return MultipartInfo
* @throws IOException
*/
OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs keyArgs,
String multipartUploadID) throws IOException;
/**
* Commit Multipart upload part file.
* @param omKeyArgs

View File

@ -65,7 +65,12 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyLocation;
import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@ -233,7 +238,7 @@ public class KeyManagerImpl implements KeyManager {
@Override
public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
OzoneManagerProtocolProtos.KeyLocation keyLocation) throws IOException {
KeyLocation keyLocation) throws IOException {
Preconditions.checkNotNull(args);
Preconditions.checkNotNull(keyLocation);
@ -518,10 +523,49 @@ public class KeyManagerImpl implements KeyManager {
allocateBlock(keyInfo, new ExcludeList(), args.getDataSize());
keyInfo.appendNewBlocks(locationInfos);
}
// When OM is not managed via ratis we should write in to Om db in
// openKey call.
if (!isRatisEnabled) {
metadataManager.getOpenKeyTable().put(openKey, keyInfo);
}
return new OpenKeySession(currentTime, keyInfo, openVersion);
}
public void applyOpenKey(KeyArgs omKeyArgs,
KeyInfo keyInfo, long clientID) throws IOException {
Preconditions.checkNotNull(omKeyArgs);
String volumeName = omKeyArgs.getVolumeName();
String bucketName = omKeyArgs.getBucketName();
// Do we need to call again validateBucket, as this is just called after
// start Transaction from applyTransaction. Can we remove this double
// check?
validateBucket(volumeName, bucketName);
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
String keyName = omKeyArgs.getKeyName();
// TODO: here if on OM machines clocks are skewed and there is a chance
// for override of the openKey entries.
try {
String openKey = metadataManager.getOpenKey(
volumeName, bucketName, keyName, clientID);
OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo);
metadataManager.getOpenKeyTable().put(openKey,
omKeyInfo);
} catch (IOException ex) {
LOG.error("Apply Open Key failed for volume:{} bucket:{} key:{}",
volumeName, bucketName, keyName, ex);
throw new OMException(ex.getMessage(),
ResultCodes.KEY_ALLOCATION_ERROR);
} finally {
metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
/**
* Create OmKeyInfo object.
* @param keyArgs
@ -826,17 +870,22 @@ public class KeyManagerImpl implements KeyManager {
@Override
public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws
IOException {
Preconditions.checkNotNull(omKeyArgs);
String volumeName = omKeyArgs.getVolumeName();
String bucketName = omKeyArgs.getBucketName();
String keyName = omKeyArgs.getKeyName();
long time = Time.monotonicNowNanos();
String uploadID = UUID.randomUUID().toString() + "-" + time;
return applyInitiateMultipartUpload(omKeyArgs, uploadID);
}
public OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs keyArgs,
String multipartUploadID) throws IOException {
Preconditions.checkNotNull(keyArgs);
Preconditions.checkNotNull(multipartUploadID);
String volumeName = keyArgs.getVolumeName();
String bucketName = keyArgs.getBucketName();
String keyName = keyArgs.getKeyName();
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
validateS3Bucket(volumeName, bucketName);
try {
long time = Time.monotonicNowNanos();
String uploadID = UUID.randomUUID().toString() + "-" + Long.toString(
time);
// We are adding uploadId to key, because if multiple users try to
// perform multipart upload on the same key, each will try to upload, who
@ -852,24 +901,24 @@ public class KeyManagerImpl implements KeyManager {
// new uploadId is returned.
String multipartKey = metadataManager.getMultipartKey(volumeName,
bucketName, keyName, uploadID);
bucketName, keyName, multipartUploadID);
// Not checking if there is an already key for this in the keyTable, as
// during final complete multipart upload we take care of this.
Map<Integer, PartKeyInfo> partKeyInfoMap = new HashMap<>();
OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo(uploadID,
partKeyInfoMap);
OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo(
multipartUploadID, partKeyInfoMap);
List<OmKeyLocationInfo> locations = new ArrayList<>();
OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
.setVolumeName(omKeyArgs.getVolumeName())
.setBucketName(omKeyArgs.getBucketName())
.setKeyName(omKeyArgs.getKeyName())
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setCreationTime(Time.now())
.setModificationTime(Time.now())
.setReplicationType(omKeyArgs.getType())
.setReplicationFactor(omKeyArgs.getFactor())
.setReplicationType(keyArgs.getType())
.setReplicationFactor(keyArgs.getFactor())
.setOmKeyLocationInfos(Collections.singletonList(
new OmKeyLocationInfoGroup(0, locations)))
.build();
@ -882,7 +931,8 @@ public class KeyManagerImpl implements KeyManager {
metadataManager.getOpenKeyTable().putWithBatch(batch,
multipartKey, omKeyInfo);
store.commitBatchOperation(batch);
return new OmMultipartInfo(volumeName, bucketName, keyName, uploadID);
return new OmMultipartInfo(volumeName, bucketName, keyName,
multipartUploadID);
}
} catch (IOException ex) {
LOG.error("Initiate Multipart upload Failed for volume:{} bucket:{} " +

View File

@ -72,7 +72,12 @@ import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyLocation;
import org.apache.hadoop.ozone.security.OzoneSecurityException;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -1985,6 +1990,51 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
}
}
@Override
public void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID)
throws IOException {
// Do we need to check again Acl's for apply OpenKey call?
if(isAclEnabled) {
checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
omKeyArgs.getVolumeName(), omKeyArgs.getBucketName(),
omKeyArgs.getKeyName());
}
boolean auditSuccess = true;
try {
keyManager.applyOpenKey(omKeyArgs, keyInfo, clientID);
} catch (Exception ex) {
metrics.incNumKeyAllocateFails();
auditSuccess = false;
AUDIT.logWriteFailure(buildAuditMessageForFailure(
OMAction.APPLY_ALLOCATE_KEY,
(omKeyArgs == null) ? null : toAuditMap(omKeyArgs), ex));
throw ex;
} finally {
if(auditSuccess){
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
OMAction.APPLY_ALLOCATE_KEY, (omKeyArgs == null) ? null :
toAuditMap(omKeyArgs)));
}
}
}
private Map<String, String> toAuditMap(KeyArgs omKeyArgs) {
Map<String, String> auditMap = new LinkedHashMap<>();
auditMap.put(OzoneConsts.VOLUME, omKeyArgs.getVolumeName());
auditMap.put(OzoneConsts.BUCKET, omKeyArgs.getBucketName());
auditMap.put(OzoneConsts.KEY, omKeyArgs.getKeyName());
auditMap.put(OzoneConsts.DATA_SIZE,
String.valueOf(omKeyArgs.getDataSize()));
auditMap.put(OzoneConsts.REPLICATION_TYPE,
omKeyArgs.hasType() ? omKeyArgs.getType().name() : null);
auditMap.put(OzoneConsts.REPLICATION_FACTOR,
omKeyArgs.hasFactor() ? omKeyArgs.getFactor().name() : null);
auditMap.put(OzoneConsts.KEY_LOCATION_INFO,
(omKeyArgs.getKeyLocationsList() != null) ?
omKeyArgs.getKeyLocationsList().toString() : null);
return auditMap;
}
@Override
public void commitKey(OmKeyArgs args, long clientID)
throws IOException {
@ -2474,6 +2524,28 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
}
}
@Override
public OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs keyArgs,
String multipartUploadID) throws IOException {
OmMultipartInfo multipartInfo;
metrics.incNumInitiateMultipartUploads();
try {
multipartInfo = keyManager.applyInitiateMultipartUpload(keyArgs,
multipartUploadID);
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
OMAction.INITIATE_MULTIPART_UPLOAD, (keyArgs == null) ? null :
keyArgs.toAuditMap()));
} catch (IOException ex) {
AUDIT.logWriteFailure(buildAuditMessageForFailure(
OMAction.INITIATE_MULTIPART_UPLOAD,
(keyArgs == null) ? null : keyArgs.toAuditMap(), ex));
metrics.incNumInitiateMultipartUploadFails();
throw ex;
}
return multipartInfo;
}
@Override
public OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws
IOException {

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@ -40,6 +41,7 @@ import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
@ -49,6 +51,8 @@ import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.ozone.om.exceptions.OMException.STATUS_CODE;
/**
* OM Ratis client to interact with OM Ratis server endpoint.
*/
@ -128,10 +132,31 @@ public final class OzoneManagerRatisClient implements Closeable {
CompletableFuture<OMResponse> reply = sendCommandAsync(request);
return reply.get();
} catch (ExecutionException | InterruptedException e) {
if (e.getCause() instanceof StateMachineException) {
OMResponse.Builder omResponse = OMResponse.newBuilder();
omResponse.setCmdType(request.getCmdType());
omResponse.setSuccess(false);
omResponse.setMessage(e.getCause().getMessage());
omResponse.setStatus(parseErrorStatus(e.getCause().getMessage()));
return omResponse.build();
}
throw new ServiceException(e);
}
}
private OzoneManagerProtocolProtos.Status parseErrorStatus(String message) {
if (message.contains(STATUS_CODE)) {
String errorCode = message.substring(message.indexOf(STATUS_CODE) +
STATUS_CODE.length());
LOG.debug("Parsing error message for error code " +
errorCode);
return OzoneManagerProtocolProtos.Status.valueOf(errorCode.trim());
} else {
return OzoneManagerProtocolProtos.Status.INTERNAL_ERROR;
}
}
/**
* Sends a given command to server gets a waitable future back.
*

View File

@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.ozone.container.common.transport.server.ratis
.ContainerStateMachine;
@ -29,12 +30,15 @@ import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoApplyInitiateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
import org.apache.hadoop.ozone.protocolPB.RequestHandler;
import org.apache.hadoop.util.Time;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
@ -48,6 +52,8 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.ozone.om.exceptions.OMException.STATUS_CODE;
/**
* The OM StateMachine is the state machine for OM Ratis server. It is
* responsible for applying ratis committed transactions to
@ -108,11 +114,67 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
ctxt.setException(ioe);
return ctxt;
}
return handleStartTransactionRequests(raftClientRequest, omRequest);
if (omRequest.getCmdType() ==
OzoneManagerProtocolProtos.Type.AllocateBlock) {
return handleAllocateBlock(raftClientRequest, omRequest);
}
/**
* Handle the RaftClientRequest and return TransactionContext object.
* @param raftClientRequest
* @param omRequest
* @return TransactionContext
*/
private TransactionContext handleStartTransactionRequests(
RaftClientRequest raftClientRequest, OMRequest omRequest) {
switch (omRequest.getCmdType()) {
case AllocateBlock:
return handleAllocateBlock(raftClientRequest, omRequest);
case CreateKey:
return handleCreateKeyRequest(raftClientRequest, omRequest);
case InitiateMultiPartUpload:
return handleInitiateMultipartUpload(raftClientRequest, omRequest);
default:
return TransactionContext.newBuilder()
.setClientRequest(raftClientRequest)
.setStateMachine(this)
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
.setLogData(raftClientRequest.getMessage().getContent())
.build();
}
}
private TransactionContext handleInitiateMultipartUpload(
RaftClientRequest raftClientRequest, OMRequest omRequest) {
// Generate a multipart uploadID, and create a new request.
// When applyTransaction happen's all OM's use the same multipartUploadID
// for the key.
long time = Time.monotonicNowNanos();
String multipartUploadID = UUID.randomUUID().toString() + "-" + time;
MultipartInfoApplyInitiateRequest multipartInfoApplyInitiateRequest =
MultipartInfoApplyInitiateRequest.newBuilder()
.setKeyArgs(omRequest.getInitiateMultiPartUploadRequest()
.getKeyArgs()).setMultipartUploadID(multipartUploadID).build();
OMRequest.Builder newOmRequest =
OMRequest.newBuilder().setCmdType(
OzoneManagerProtocolProtos.Type.ApplyInitiateMultiPartUpload)
.setInitiateMultiPartUploadApplyRequest(
multipartInfoApplyInitiateRequest)
.setClientId(omRequest.getClientId());
if (omRequest.hasTraceID()) {
newOmRequest.setTraceID(omRequest.getTraceID());
}
ByteString messageContent =
ByteString.copyFrom(newOmRequest.build().toByteArray());
return TransactionContext.newBuilder()
.setClientRequest(raftClientRequest)
.setStateMachine(this)
@ -121,6 +183,61 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
.build();
}
/**
* Handle createKey Request, which needs a special handling. This request
* needs to be executed on the leader, and the response received from this
* request we need to create a ApplyKeyRequest and create a
* TransactionContext object.
*/
private TransactionContext handleCreateKeyRequest(
RaftClientRequest raftClientRequest, OMRequest omRequest) {
OMResponse omResponse = handler.handle(omRequest);
// TODO: if not success should we retry depending on the error if it is
// retriable?
if (!omResponse.getSuccess()) {
TransactionContext transactionContext = TransactionContext.newBuilder()
.setClientRequest(raftClientRequest)
.setStateMachine(this)
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
.build();
transactionContext.setException(
constructExceptionForFailedRequest(omResponse));
return transactionContext;
}
// Get original request
OzoneManagerProtocolProtos.CreateKeyRequest createKeyRequest =
omRequest.getCreateKeyRequest();
// Create Applykey Request.
OzoneManagerProtocolProtos.ApplyCreateKeyRequest applyCreateKeyRequest =
OzoneManagerProtocolProtos.ApplyCreateKeyRequest.newBuilder()
.setCreateKeyRequest(createKeyRequest)
.setCreateKeyResponse(omResponse.getCreateKeyResponse()).build();
OMRequest.Builder newOmRequest =
OMRequest.newBuilder().setCmdType(
OzoneManagerProtocolProtos.Type.ApplyCreateKey)
.setApplyCreateKeyRequest(applyCreateKeyRequest)
.setClientId(omRequest.getClientId());
if (omRequest.hasTraceID()) {
newOmRequest.setTraceID(omRequest.getTraceID());
}
ByteString messageContent =
ByteString.copyFrom(newOmRequest.build().toByteArray());
return TransactionContext.newBuilder()
.setClientRequest(raftClientRequest)
.setStateMachine(this)
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
.setLogData(messageContent)
.build();
}
/**
* Handle AllocateBlock Request, which needs a special handling. This
* request needs to be executed on the leader, where it connects to SCM and
@ -148,9 +265,8 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
.setStateMachine(this)
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
.build();
IOException ioe = new IOException(omResponse.getMessage() +
" Status code " + omResponse.getStatus());
transactionContext.setException(ioe);
transactionContext.setException(
constructExceptionForFailedRequest(omResponse));
return transactionContext;
}
@ -181,6 +297,17 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
}
/**
* Construct IOException message for failed requests in StartTransaction.
* @param omResponse
* @return
*/
private IOException constructExceptionForFailedRequest(
OMResponse omResponse) {
return new IOException(omResponse.getMessage() + " " +
STATUS_CODE + omResponse.getStatus());
}
/*
* Apply a committed log entry to the state machine.
*/

View File

@ -47,6 +47,10 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFile
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ApplyCreateKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoApplyInitiateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelDelegationTokenResponseProto;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CheckVolumeAccessRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CheckVolumeAccessResponse;
@ -205,6 +209,11 @@ public class OzoneManagerRequestHandler implements RequestHandler {
request.getCreateKeyRequest());
responseBuilder.setCreateKeyResponse(createKeyResponse);
break;
case ApplyCreateKey:
CreateKeyResponse applyKeyResponse =
applyCreateKey(request.getApplyCreateKeyRequest());
responseBuilder.setCreateKeyResponse(applyKeyResponse);
break;
case LookupKey:
LookupKeyResponse lookupKeyResponse = lookupKey(
request.getLookupKeyRequest());
@ -262,6 +271,13 @@ public class OzoneManagerRequestHandler implements RequestHandler {
responseBuilder.setInitiateMultiPartUploadResponse(
multipartInfoInitiateResponse);
break;
case ApplyInitiateMultiPartUpload:
MultipartInfoInitiateResponse response =
applyInitiateMultiPartUpload(
request.getInitiateMultiPartUploadApplyRequest());
responseBuilder.setInitiateMultiPartUploadResponse(
response);
break;
case CommitMultiPartUpload:
MultipartCommitUploadPartResponse commitUploadPartResponse =
commitMultipartUploadPart(
@ -498,6 +514,20 @@ public class OzoneManagerRequestHandler implements RequestHandler {
return resp.build();
}
private CreateKeyResponse applyCreateKey(ApplyCreateKeyRequest request)
throws IOException {
CreateKeyRequest createKeyRequest = request.getCreateKeyRequest();
CreateKeyResponse createKeyResponse = request.getCreateKeyResponse();
impl.applyOpenKey(createKeyRequest.getKeyArgs(),
createKeyResponse.getKeyInfo(), createKeyResponse.getID());
// If applying to om DB successful just return createKeyResponse.
return createKeyResponse;
}
private LookupKeyResponse lookupKey(LookupKeyRequest request)
throws IOException {
LookupKeyResponse.Builder resp =
@ -731,6 +761,30 @@ public class OzoneManagerRequestHandler implements RequestHandler {
return resp.build();
}
private MultipartInfoInitiateResponse applyInitiateMultiPartUpload(
MultipartInfoApplyInitiateRequest request) throws IOException {
MultipartInfoInitiateResponse.Builder resp = MultipartInfoInitiateResponse
.newBuilder();
KeyArgs keyArgs = request.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setType(keyArgs.getType())
.setFactor(keyArgs.getFactor())
.build();
OmMultipartInfo multipartInfo =
impl.applyInitiateMultipartUpload(omKeyArgs,
request.getMultipartUploadID());
resp.setVolumeName(multipartInfo.getVolumeName());
resp.setBucketName(multipartInfo.getBucketName());
resp.setKeyName(multipartInfo.getKeyName());
resp.setMultipartUploadID(multipartInfo.getUploadID());
return resp.build();
}
private MultipartCommitUploadPartResponse commitMultipartUploadPart(
MultipartCommitUploadPartRequest request) throws IOException {
MultipartCommitUploadPartResponse.Builder resp =

View File

@ -251,7 +251,7 @@ public class TestOzoneManagerStateMachine {
// As the request failed, check for keyLocation and the transaction
// context error message
Assert.assertFalse(newOmRequest.getAllocateBlockRequest().hasKeyLocation());
Assert.assertEquals("Scm in Chill mode Status code "
Assert.assertEquals("Scm in Chill mode " + OMException.STATUS_CODE
+ OMException.ResultCodes.SCM_IN_CHILL_MODE,
transactionContext.getException().getMessage());
Assert.assertTrue(transactionContext.getException() instanceof IOException);