diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java index 053bc0f1ba7..7e0ed880996 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java @@ -24,6 +24,7 @@ public enum OMAction implements AuditAction { // WRITE Actions ALLOCATE_BLOCK, + ADD_ALLOCATE_BLOCK, ALLOCATE_KEY, COMMIT_KEY, CREATE_VOLUME, diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java new file mode 100644 index 00000000000..7390fe2cb68 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.protocol; + +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .KeyLocation; + +import java.io.IOException; + +/** + * Protocol to talk to OM HA. These methods are needed only called from + * OmRequestHandler. + */ +public interface OzoneManagerHAProtocol { + + /** + * Add a allocate block, it is assumed that the client is having an open + * key session going on. This block will be appended to this open key session. + * This will be called only during HA enabled OM, as during HA we get an + * allocated Block information, and add that information to OM DB. + * + * In HA the flow for allocateBlock is in StartTransaction allocateBlock + * will be called which returns block information, and in the + * applyTransaction addAllocateBlock will be called to add the block + * information to DB. + * + * @param args the key to append + * @param clientID the client identification + * @param keyLocation key location given by allocateBlock + * @return an allocated block + * @throws IOException + */ + OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID, + KeyLocation keyLocation) throws IOException; + + +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java index 0fee78c91ff..748702e8109 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java @@ -190,6 +190,7 @@ public interface OzoneManagerProtocol OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID, ExcludeList excludeList) throws IOException; + /** * Look up for the container of an existing key. * diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerServerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerServerProtocol.java new file mode 100644 index 00000000000..6f58e2d7ebd --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerServerProtocol.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.protocol; + +/** + * This will be used in the OzoneManager Server, as few of the methods in + * OzoneManagerHAProtocol need not be exposed to Om clients. This interface + * extends both OzoneManagerHAProtocol and OzoneManagerProtocol. + */ +public interface OzoneManagerServerProtocol extends OzoneManagerProtocol, + OzoneManagerHAProtocol { +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index c06efdc2c11..49a3fe6626e 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -719,7 +719,6 @@ public final class OzoneManagerProtocolClientSideTranslatorPB .getAllocateBlockResponse(); return OmKeyLocationInfo.getFromProtobuf(resp.getKeyLocation()); } - @Override public void commitKey(OmKeyArgs args, long clientId) throws IOException { diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index 2f9410bb1e7..e720c3e7df6 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -628,6 +628,10 @@ message AllocateBlockRequest { required KeyArgs keyArgs = 1; required uint64 clientID = 2; optional hadoop.hdds.ExcludeListProto excludeList = 3; + // During HA on one of the OM nodes, we allocate block and send the + // AllocateBlockRequest with keyLocation set. If this is set, no need to + // call scm again in OM Ratis applyTransaction just append it to DB. + optional KeyLocation keyLocation = 4; } message AllocateBlockResponse { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java index 5361745de46..bcb35320b31 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java @@ -149,7 +149,8 @@ public class TestOmBlockVersioning { // this block will be appended to the latest version of version 2. OmKeyLocationInfo locationInfo = - ozoneManager.allocateBlock(keyArgs, openKey.getId(), new ExcludeList()); + ozoneManager.allocateBlock(keyArgs, openKey.getId(), + new ExcludeList()); List locationInfoList = openKey.getKeyInfo().getLatestVersionLocations() .getBlocksLatestVersionOnly(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index 122312daa8a..1f502d6ed0d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.utils.BackgroundService; import java.io.IOException; @@ -75,6 +76,20 @@ public interface KeyManager { */ OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID, ExcludeList excludeList) throws IOException; + + /** + * Ozone manager state machine call's this on an open key, to add allocated + * block to the tail of current block list of the open client. + * + * @param args the key to append + * @param clientID the client requesting block. + * @param keyLocation key location. + * @return the reference to the new block. + * @throws IOException + */ + OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID, + OzoneManagerProtocolProtos.KeyLocation keyLocation) throws IOException; + /** * Given the args of a key to put, write an open key entry to meta data. * diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 187509f81bb..bac886394e0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -63,6 +63,7 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts; import org.apache.hadoop.ozone.om.helpers.OmPartInfo; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -120,6 +121,8 @@ public class KeyManagerImpl implements KeyManager { private final KeyProviderCryptoExtension kmsProvider; + private final boolean isRatisEnabled; + public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient, OMMetadataManager metadataManager, OzoneConfiguration conf, String omId, OzoneBlockTokenSecretManager secretManager) { @@ -148,6 +151,9 @@ public class KeyManagerImpl implements KeyManager { HDDS_BLOCK_TOKEN_ENABLED, HDDS_BLOCK_TOKEN_ENABLED_DEFAULT); this.kmsProvider = kmsProvider; + this.isRatisEnabled = conf.getBoolean( + OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, + OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT); } @Override @@ -223,10 +229,41 @@ public class KeyManagerImpl implements KeyManager { } @Override - public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID, - ExcludeList excludeList) - throws IOException { + public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID, + OzoneManagerProtocolProtos.KeyLocation keyLocation) throws IOException { Preconditions.checkNotNull(args); + Preconditions.checkNotNull(keyLocation); + + + String volumeName = args.getVolumeName(); + String bucketName = args.getBucketName(); + String keyName = args.getKeyName(); + validateBucket(volumeName, bucketName); + String openKey = metadataManager.getOpenKey( + volumeName, bucketName, keyName, clientID); + + OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get(openKey); + if (keyInfo == null) { + LOG.error("Allocate block for a key not in open status in meta store" + + " /{}/{}/{} with ID {}", volumeName, bucketName, keyName, clientID); + throw new OMException("Open Key not found", + OMException.ResultCodes.KEY_NOT_FOUND); + } + + OmKeyLocationInfo omKeyLocationInfo = + OmKeyLocationInfo.getFromProtobuf(keyLocation); + keyInfo.appendNewBlocks(Collections.singletonList(omKeyLocationInfo)); + keyInfo.updateModifcationTime(); + metadataManager.getOpenKeyTable().put(openKey, keyInfo); + return omKeyLocationInfo; + } + + @Override + public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID, + ExcludeList excludeList) throws IOException { + Preconditions.checkNotNull(args); + + String volumeName = args.getVolumeName(); String bucketName = args.getBucketName(); String keyName = args.getKeyName(); @@ -246,11 +283,16 @@ public class KeyManagerImpl implements KeyManager { // the same version List locationInfos = allocateBlock(keyInfo, excludeList, scmBlockSize); - keyInfo.appendNewBlocks(locationInfos); - keyInfo.updateModifcationTime(); - metadataManager.getOpenKeyTable().put(openKey, - keyInfo); + + // If om is not managing via ratis, write to db, otherwise write to DB + // will happen via ratis apply transaction. + if (!isRatisEnabled) { + keyInfo.appendNewBlocks(locationInfos); + keyInfo.updateModifcationTime(); + metadataManager.getOpenKeyTable().put(openKey, keyInfo); + } return locationInfos.get(0); + } /** diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java index 946b89867bb..fda46e3bd2a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java @@ -60,6 +60,7 @@ public class OMMetrics { private @Metric MutableCounterLong numVolumeLists; private @Metric MutableCounterLong numKeyCommits; private @Metric MutableCounterLong numAllocateBlockCalls; + private @Metric MutableCounterLong numAddAllocateBlockCalls; private @Metric MutableCounterLong numGetServiceLists; private @Metric MutableCounterLong numListS3Buckets; private @Metric MutableCounterLong numInitiateMultipartUploads; @@ -85,6 +86,7 @@ public class OMMetrics { private @Metric MutableCounterLong numVolumeListFails; private @Metric MutableCounterLong numKeyCommitFails; private @Metric MutableCounterLong numBlockAllocateCallFails; + private @Metric MutableCounterLong numAddAllocateBlockCallFails; private @Metric MutableCounterLong numGetServiceListFails; private @Metric MutableCounterLong numListS3BucketsFails; private @Metric MutableCounterLong numInitiateMultipartUploadFails; @@ -379,6 +381,14 @@ public class OMMetrics { numBlockAllocateCallFails.incr(); } + public void incNumAddAllocateBlockCalls() { + numAddAllocateBlockCalls.incr(); + } + + public void incNumAddAllocateBlockFails() { + numAddAllocateBlockCallFails.incr(); + } + public void incNumBucketListFails() { numBucketListFails.incr(); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 43ad16a3e2f..05597628e36 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -72,6 +72,8 @@ import org.apache.hadoop.ozone.OzoneIllegalArgumentException; import org.apache.hadoop.ozone.OzoneSecurityUtil; import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; import org.apache.hadoop.ozone.om.helpers.S3SecretValue; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation; import org.apache.hadoop.ozone.security.OzoneSecurityException; import org.apache.hadoop.ozone.security.OzoneTokenIdentifier; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -102,7 +104,6 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.ServiceInfo; -import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; @@ -205,7 +206,7 @@ import static org.apache.hadoop.util.ExitUtil.terminate; */ @InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"}) public final class OzoneManager extends ServiceRuntimeInfoImpl - implements OzoneManagerProtocol, OMMXBean, Auditor { + implements OzoneManagerServerProtocol, OMMXBean, Auditor { public static final Logger LOG = LoggerFactory.getLogger(OzoneManager.class); @@ -2036,6 +2037,35 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl } } + + @Override + public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID, + KeyLocation keyLocation) throws IOException { + if(isAclEnabled) { + checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE, + args.getVolumeName(), args.getBucketName(), args.getKeyName()); + } + boolean auditSuccess = true; + Map auditMap = (args == null) ? new LinkedHashMap<>() : + args.toAuditMap(); + auditMap.put(OzoneConsts.CLIENT_ID, String.valueOf(clientID)); + try { + metrics.incNumAddAllocateBlockCalls(); + return keyManager.addAllocatedBlock(args, clientID, keyLocation); + } catch (Exception ex) { + metrics.incNumAddAllocateBlockFails(); + auditSuccess = false; + AUDIT.logWriteFailure(buildAuditMessageForFailure( + OMAction.ADD_ALLOCATE_BLOCK, auditMap, ex)); + throw ex; + } finally { + if(auditSuccess){ + AUDIT.logWriteSuccess(buildAuditMessageForSuccess( + OMAction.ADD_ALLOCATE_BLOCK, auditMap)); + } + } + } + /** * Lookup a key. * diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java index a3cde3e9d52..01979e4dcd2 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java @@ -40,7 +40,8 @@ import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMNodeDetails; -import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.conf.RaftProperties; @@ -81,7 +82,7 @@ public final class OzoneManagerRatisServer { private final RaftGroup raftGroup; private final RaftPeerId raftPeerId; - private final OzoneManagerProtocol ozoneManager; + private final OzoneManagerServerProtocol ozoneManager; private final ClientId clientId = ClientId.randomId(); private final ScheduledExecutorService scheduledRoleChecker; @@ -107,7 +108,8 @@ public final class OzoneManagerRatisServer { * @param raftPeers peer nodes in the raft ring * @throws IOException */ - private OzoneManagerRatisServer(Configuration conf, OzoneManagerProtocol om, + private OzoneManagerRatisServer(Configuration conf, + OzoneManagerServerProtocol om, String raftGroupIdStr, RaftPeerId localRaftPeerId, InetSocketAddress addr, List raftPeers) throws IOException { @@ -154,7 +156,7 @@ public final class OzoneManagerRatisServer { * Creates an instance of OzoneManagerRatisServer. */ public static OzoneManagerRatisServer newOMRatisServer( - Configuration ozoneConf, OzoneManagerProtocol om, + Configuration ozoneConf, OzoneManager om, OMNodeDetails omNodeDetails, List peerNodes) throws IOException { @@ -199,7 +201,7 @@ public final class OzoneManagerRatisServer { return new OzoneManagerStateMachine(this); } - public OzoneManagerProtocol getOzoneManager() { + public OzoneManagerServerProtocol getOzoneManager() { return ozoneManager; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index acbbd34ac85..590b359d169 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.om.ratis; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.protobuf.ServiceException; import java.io.IOException; @@ -26,12 +27,14 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis .ContainerStateMachine; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; -import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMResponse; import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler; +import org.apache.hadoop.ozone.protocolPB.RequestHandler; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; @@ -57,8 +60,8 @@ public class OzoneManagerStateMachine extends BaseStateMachine { private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private final OzoneManagerRatisServer omRatisServer; - private final OzoneManagerProtocol ozoneManager; - private final OzoneManagerRequestHandler handler; + private final OzoneManagerServerProtocol ozoneManager; + private RequestHandler handler; private RaftGroupId raftGroupId; public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) { @@ -105,6 +108,11 @@ public class OzoneManagerStateMachine extends BaseStateMachine { ctxt.setException(ioe); return ctxt; } + + if (omRequest.getCmdType() == + OzoneManagerProtocolProtos.Type.AllocateBlock) { + return handleAllocateBlock(raftClientRequest, omRequest); + } return TransactionContext.newBuilder() .setClientRequest(raftClientRequest) .setStateMachine(this) @@ -113,6 +121,66 @@ public class OzoneManagerStateMachine extends BaseStateMachine { .build(); } + /** + * Handle AllocateBlock Request, which needs a special handling. This + * request needs to be executed on the leader, where it connects to SCM and + * get Block information. + * @param raftClientRequest + * @param omRequest + * @return TransactionContext + */ + private TransactionContext handleAllocateBlock( + RaftClientRequest raftClientRequest, OMRequest omRequest) { + OMResponse omResponse = handler.handle(omRequest); + + + // If request is failed, no need to proceed further. + // Setting the exception with omResponse message and code. + + // TODO: the allocate block fails when scm is in chill mode or when scm is + // down, but that error is not correctly received in OM end, once that + // is fixed, we need to see how to handle this failure case or how we + // need to retry or how to handle this scenario. For other errors like + // KEY_NOT_FOUND, we don't need a retry/ + if (!omResponse.getSuccess()) { + TransactionContext transactionContext = TransactionContext.newBuilder() + .setClientRequest(raftClientRequest) + .setStateMachine(this) + .setServerRole(RaftProtos.RaftPeerRole.LEADER) + .build(); + IOException ioe = new IOException(omResponse.getMessage() + + " Status code " + omResponse.getStatus()); + transactionContext.setException(ioe); + return transactionContext; + } + + + // Get original request + OzoneManagerProtocolProtos.AllocateBlockRequest allocateBlockRequest = + omRequest.getAllocateBlockRequest(); + + // Create new AllocateBlockRequest with keyLocation set. + OzoneManagerProtocolProtos.AllocateBlockRequest newAllocateBlockRequest = + OzoneManagerProtocolProtos.AllocateBlockRequest.newBuilder(). + mergeFrom(allocateBlockRequest) + .setKeyLocation( + omResponse.getAllocateBlockResponse().getKeyLocation()).build(); + + OMRequest newOmRequest = omRequest.toBuilder().setCmdType( + OzoneManagerProtocolProtos.Type.AllocateBlock) + .setAllocateBlockRequest(newAllocateBlockRequest).build(); + + ByteString messageContent = ByteString.copyFrom(newOmRequest.toByteArray()); + + return TransactionContext.newBuilder() + .setClientRequest(raftClientRequest) + .setStateMachine(this) + .setServerRole(RaftProtos.RaftPeerRole.LEADER) + .setLogData(messageContent) + .build(); + + } + /* * Apply a committed log entry to the state machine. */ @@ -169,4 +237,14 @@ public class OzoneManagerStateMachine extends BaseStateMachine { return future; } + @VisibleForTesting + public void setHandler(RequestHandler handler) { + this.handler = handler; + } + + @VisibleForTesting + public void setRaftGroupId(RaftGroupId raftGroupId) { + this.raftGroupId = raftGroupId; + } + } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 395cc42388c..72b4d12e730 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.protocolPB; import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.om.exceptions.NotLeaderException; -import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; @@ -46,7 +46,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements .getLogger(OzoneManagerProtocolServerSideTranslatorPB.class); private final OzoneManagerRatisServer omRatisServer; private final OzoneManagerRatisClient omRatisClient; - private final OzoneManagerRequestHandler handler; + private final RequestHandler handler; private final boolean isRatisEnabled; /** @@ -55,7 +55,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements * @param impl OzoneManagerProtocolPB */ public OzoneManagerProtocolServerSideTranslatorPB( - OzoneManagerProtocol impl, OzoneManagerRatisServer ratisServer, + OzoneManagerServerProtocol impl, OzoneManagerRatisServer ratisServer, OzoneManagerRatisClient ratisClient, boolean enableRatis) { handler = new OzoneManagerRequestHandler(impl); this.omRatisServer = ratisServer; @@ -89,7 +89,6 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements scope.close(); } } - /** * Submits request to OM's Ratis server. */ diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java index 8d768421248..81bc512ec0d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java @@ -41,7 +41,7 @@ import org.apache.hadoop.ozone.om.helpers.OmPartInfo; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.ServiceInfo; -import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockResponse; @@ -123,17 +123,18 @@ import org.slf4j.LoggerFactory; * Command Handler for OM requests. OM State Machine calls this handler for * deserializing the client request and sending it to OM. */ -public class OzoneManagerRequestHandler { +public class OzoneManagerRequestHandler implements RequestHandler { static final Logger LOG = LoggerFactory.getLogger(OzoneManagerRequestHandler.class); - private final OzoneManagerProtocol impl; + private final OzoneManagerServerProtocol impl; - public OzoneManagerRequestHandler(OzoneManagerProtocol om) { + public OzoneManagerRequestHandler(OzoneManagerServerProtocol om) { this.impl = om; } //TODO simplify it to make it shorter @SuppressWarnings("methodlength") + @Override public OMResponse handle(OMRequest request) { LOG.debug("Received OMRequest: {}, ", request); Type cmdType = request.getCmdType(); @@ -344,6 +345,7 @@ public class OzoneManagerRequestHandler { * @param omRequest client request to OM * @throws OMException thrown if required parameters are set to null. */ + @Override public void validateRequest(OMRequest omRequest) throws OMException { Type cmdType = omRequest.getCmdType(); if (cmdType == null) { @@ -627,9 +629,18 @@ public class OzoneManagerRequestHandler { .setBucketName(keyArgs.getBucketName()) .setKeyName(keyArgs.getKeyName()) .build(); - OmKeyLocationInfo newLocation = - impl.allocateBlock(omKeyArgs, request.getClientID(), - ExcludeList.getFromProtoBuf(request.getExcludeList())); + + OmKeyLocationInfo newLocation; + if (request.hasKeyLocation()) { + newLocation = + impl.addAllocatedBlock(omKeyArgs, request.getClientID(), + request.getKeyLocation()); + } else { + newLocation = + impl.allocateBlock(omKeyArgs, request.getClientID(), + ExcludeList.getFromProtoBuf(request.getExcludeList())); + } + resp.setKeyLocation(newLocation.getProtobuf()); return resp.build(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java new file mode 100644 index 00000000000..2cf8dea93e9 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java @@ -0,0 +1,31 @@ +package org.apache.hadoop.ozone.protocolPB; + +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos. + OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos. + OMResponse; + +/** + * Handler to handle the OmRequests. + */ +public interface RequestHandler { + + + /** + * Handle the OmRequest, and returns OmResponse. + * @param request + * @return OmResponse + */ + OMResponse handle(OMRequest request); + + /** + * Validates that the incoming OM request has required parameters. + * TODO: Add more validation checks before writing the request to Ratis log. + * + * @param omRequest client request to OM + * @throws OMException thrown if required parameters are set to null. + */ + void validateRequest(OMRequest omRequest) throws OMException; + +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java new file mode 100644 index 00000000000..f98c8caee1c --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java @@ -0,0 +1,264 @@ +package org.apache.hadoop.ozone.om.ratis; + +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.OMNodeDetails; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .AllocateBlockRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .KeyArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler; +import org.apache.hadoop.ozone.protocolPB.RequestHandler; +import org.apache.ratis.proto.RaftProtos; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.statemachine.TransactionContext; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.UUID; + +import static org.mockito.Mockito.when; + +/** + * This class tests OzoneManagerStateMachine. + */ +public class TestOzoneManagerStateMachine { + + private OzoneConfiguration conf; + private OzoneManagerRatisServer omRatisServer; + private String omID; + private RequestHandler requestHandler; + private RaftGroupId raftGroupId; + private OzoneManagerStateMachine ozoneManagerStateMachine; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + + @Before + public void setup() throws Exception { + conf = new OzoneConfiguration(); + omID = UUID.randomUUID().toString(); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, + temporaryFolder.newFolder().toString()); + int ratisPort = conf.getInt( + OMConfigKeys.OZONE_OM_RATIS_PORT_KEY, + OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT); + InetSocketAddress rpcAddress = new InetSocketAddress( + InetAddress.getLocalHost(), 0); + OMNodeDetails omNodeDetails = new OMNodeDetails.Builder() + .setRpcAddress(rpcAddress) + .setRatisPort(ratisPort) + .setOMNodeId(omID) + .setOMServiceId(OzoneConsts.OM_SERVICE_ID_DEFAULT) + .build(); + // Starts a single node Ratis server + omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, null, + omNodeDetails, Collections.emptyList()); + + + ozoneManagerStateMachine = + new OzoneManagerStateMachine(omRatisServer); + + requestHandler = Mockito.mock(OzoneManagerRequestHandler.class); + raftGroupId = omRatisServer.getRaftGroup().getGroupId(); + + ozoneManagerStateMachine.setHandler(requestHandler); + ozoneManagerStateMachine.setRaftGroupId(raftGroupId); + + } + + @Test + public void testAllocateBlockRequestWithSuccess() throws Exception { + + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + long allocateBlockClientId = RandomUtils.nextLong(); + String clientId = UUID.randomUUID().toString(); + + + OMRequest omRequest = createOmRequestForAllocateBlock(volumeName, + bucketName, keyName, allocateBlockClientId, clientId); + + OzoneManagerProtocolProtos.OMResponse omResponse = + createOmResponseForAllocateBlock(true); + + when(requestHandler.handle(omRequest)).thenReturn(omResponse); + + + RaftClientRequest raftClientRequest = + new RaftClientRequest(ClientId.randomId(), + RaftPeerId.valueOf("random"), raftGroupId, 1, 1, + Message.valueOf( + OMRatisHelper.convertRequestToByteString(omRequest)), + RaftClientRequest.Type.valueOf( + RaftProtos.WriteRequestTypeProto.getDefaultInstance())); + + TransactionContext transactionContext = + ozoneManagerStateMachine.startTransaction(raftClientRequest); + + + OMRequest newOmRequest = OMRatisHelper.convertByteStringToOMRequest( + transactionContext.getStateMachineLogEntry().getLogData()); + + Assert.assertTrue(newOmRequest.hasAllocateBlockRequest()); + checkModifiedOmRequest(omRequest, newOmRequest); + + // Check this keyLocation, and the keyLocation is same as from OmResponse. + Assert.assertTrue(newOmRequest.getAllocateBlockRequest().hasKeyLocation()); + + Assert.assertEquals(omResponse.getAllocateBlockResponse().getKeyLocation(), + newOmRequest.getAllocateBlockRequest().getKeyLocation()); + + } + + + private OMRequest createOmRequestForAllocateBlock(String volumeName, + String bucketName, String keyName, long allocateClientId, + String clientId) { + //Create AllocateBlockRequest + AllocateBlockRequest.Builder req = AllocateBlockRequest.newBuilder(); + KeyArgs keyArgs = KeyArgs.newBuilder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setDataSize(100).build(); + req.setKeyArgs(keyArgs); + req.setClientID(allocateClientId); + req.setExcludeList(HddsProtos.ExcludeListProto.getDefaultInstance()); + return OMRequest.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.AllocateBlock) + .setAllocateBlockRequest(req) + .setClientId(clientId) + .build(); + } + + private OMResponse createOmResponseForAllocateBlock(boolean status) { + OmKeyLocationInfo newLocation = new OmKeyLocationInfo.Builder().setBlockID( + new BlockID(RandomUtils.nextLong(), + RandomUtils.nextLong())) + .setLength(RandomUtils.nextLong()) + .setOffset(0).setPipeline( + Pipeline.newBuilder().setId(PipelineID.randomId()) + .setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.THREE) + .setState(Pipeline.PipelineState.OPEN) + .setNodes(new ArrayList<>()).build()).build(); + + OzoneManagerProtocolProtos.AllocateBlockResponse.Builder resp = + OzoneManagerProtocolProtos.AllocateBlockResponse.newBuilder(); + resp.setKeyLocation(newLocation.getProtobuf()); + + + if (status) { + return OzoneManagerProtocolProtos.OMResponse.newBuilder().setSuccess(true) + .setAllocateBlockResponse(resp) + .setCmdType(OzoneManagerProtocolProtos.Type.AllocateBlock) + .setStatus(OzoneManagerProtocolProtos.Status.OK) + .setSuccess(status).build(); + } else { + return OzoneManagerProtocolProtos.OMResponse.newBuilder().setSuccess(true) + .setAllocateBlockResponse(resp) + .setCmdType(OzoneManagerProtocolProtos.Type.AllocateBlock) + .setStatus(OzoneManagerProtocolProtos.Status.SCM_IN_CHILL_MODE) + .setMessage("Scm in Chill mode") + .setSuccess(status).build(); + } + + } + @Test + public void testAllocateBlockWithFailure() throws Exception{ + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + long allocateBlockClientId = RandomUtils.nextLong(); + String clientId = UUID.randomUUID().toString(); + + + OMRequest omRequest = createOmRequestForAllocateBlock(volumeName, + bucketName, keyName, allocateBlockClientId, clientId); + + OzoneManagerProtocolProtos.OMResponse omResponse = + createOmResponseForAllocateBlock(false); + + when(requestHandler.handle(omRequest)).thenReturn(omResponse); + + + RaftClientRequest raftClientRequest = + new RaftClientRequest(ClientId.randomId(), + RaftPeerId.valueOf("random"), raftGroupId, 1, 1, + Message.valueOf( + OMRatisHelper.convertRequestToByteString(omRequest)), + RaftClientRequest.Type.valueOf( + RaftProtos.WriteRequestTypeProto.getDefaultInstance())); + + TransactionContext transactionContext = + ozoneManagerStateMachine.startTransaction(raftClientRequest); + + + OMRequest newOmRequest = OMRatisHelper.convertByteStringToOMRequest( + transactionContext.getStateMachineLogEntry().getLogData()); + + Assert.assertTrue(newOmRequest.hasAllocateBlockRequest()); + checkModifiedOmRequest(omRequest, newOmRequest); + + // As the request failed, check for keyLocation and the transaction + // context error message + Assert.assertFalse(newOmRequest.getAllocateBlockRequest().hasKeyLocation()); + Assert.assertEquals("Scm in Chill mode Status code " + + OMException.ResultCodes.SCM_IN_CHILL_MODE, + transactionContext.getException().getMessage()); + Assert.assertTrue(transactionContext.getException() instanceof IOException); + + } + + private void checkModifiedOmRequest(OMRequest omRequest, + OMRequest newOmRequest) { + Assert.assertTrue(newOmRequest.getAllocateBlockRequest() + .getKeyArgs().getBucketName().equals( + omRequest.getAllocateBlockRequest().getKeyArgs().getBucketName())); + Assert.assertTrue(omRequest.getAllocateBlockRequest() + .getKeyArgs().getVolumeName().equals( + newOmRequest.getAllocateBlockRequest().getKeyArgs() + .getVolumeName())); + Assert.assertTrue(omRequest.getAllocateBlockRequest() + .getKeyArgs().getKeyName().equals( + newOmRequest.getAllocateBlockRequest().getKeyArgs().getKeyName())); + Assert.assertEquals(omRequest.getAllocateBlockRequest() + .getKeyArgs().getDataSize(), + newOmRequest.getAllocateBlockRequest().getKeyArgs().getDataSize()); + Assert.assertEquals(omRequest.getAllocateBlockRequest() + .getClientID(), + newOmRequest.getAllocateBlockRequest().getClientID()); + Assert.assertEquals(omRequest.getClientId(), newOmRequest.getClientId()); + } +}