From 2e997d818d00ff9ca868afd895648fdaa380922d Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Thu, 23 Feb 2023 04:37:35 +0800 Subject: [PATCH] YARN-11370. [Federation] Refactor MemoryFederationStateStore code. (#5126) --- .../impl/MemoryFederationStateStore.java | 112 ++++++++++-------- .../impl/FederationStateStoreBaseTest.java | 19 +-- 2 files changed, 66 insertions(+), 65 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index b91de3ae808..03fb19a173d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; @@ -107,7 +108,7 @@ import static org.apache.hadoop.yarn.server.federation.store.utils.FederationSta public class MemoryFederationStateStore implements FederationStateStore { private Map membership; - private Map applications; + private Map applications; private Map reservations; private Map policies; private RouterRMDTSecretManagerState routerRMSecretManagerState; @@ -122,10 +123,10 @@ public class MemoryFederationStateStore implements FederationStateStore { @Override public void init(Configuration conf) { - membership = new ConcurrentHashMap(); - applications = new ConcurrentHashMap(); - reservations = new ConcurrentHashMap(); - policies = new ConcurrentHashMap(); + membership = new ConcurrentHashMap<>(); + applications = new ConcurrentHashMap<>(); + reservations = new ConcurrentHashMap<>(); + policies = new ConcurrentHashMap<>(); routerRMSecretManagerState = new RouterRMDTSecretManagerState(); maxAppsInStateStore = conf.getInt( YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, @@ -143,14 +144,15 @@ public class MemoryFederationStateStore implements FederationStateStore { } @Override - public SubClusterRegisterResponse registerSubCluster( - SubClusterRegisterRequest request) throws YarnException { + public SubClusterRegisterResponse registerSubCluster(SubClusterRegisterRequest request) + throws YarnException { + long startTime = clock.getTime(); + FederationMembershipStateStoreInputValidator.validate(request); SubClusterInfo subClusterInfo = request.getSubClusterInfo(); long currentTime = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); - SubClusterInfo subClusterInfoToSave = SubClusterInfo.newInstance(subClusterInfo.getSubClusterId(), subClusterInfo.getAMRMServiceAddress(), @@ -161,18 +163,21 @@ public class MemoryFederationStateStore implements FederationStateStore { subClusterInfo.getCapability()); membership.put(subClusterInfo.getSubClusterId(), subClusterInfoToSave); + long stopTime = clock.getTime(); + + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); return SubClusterRegisterResponse.newInstance(); } @Override - public SubClusterDeregisterResponse deregisterSubCluster( - SubClusterDeregisterRequest request) throws YarnException { + public SubClusterDeregisterResponse deregisterSubCluster(SubClusterDeregisterRequest request) + throws YarnException { + FederationMembershipStateStoreInputValidator.validate(request); SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId()); if (subClusterInfo == null) { - String errMsg = - "SubCluster " + request.getSubClusterId().toString() + " not found"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + FederationStateStoreUtils.logAndThrowStoreException( + LOG, "SubCluster %s not found", request.getSubClusterId()); } else { subClusterInfo.setState(request.getState()); } @@ -181,17 +186,16 @@ public class MemoryFederationStateStore implements FederationStateStore { } @Override - public SubClusterHeartbeatResponse subClusterHeartbeat( - SubClusterHeartbeatRequest request) throws YarnException { + public SubClusterHeartbeatResponse subClusterHeartbeat(SubClusterHeartbeatRequest request) + throws YarnException { FederationMembershipStateStoreInputValidator.validate(request); SubClusterId subClusterId = request.getSubClusterId(); SubClusterInfo subClusterInfo = membership.get(subClusterId); if (subClusterInfo == null) { - String errMsg = "SubCluster " + subClusterId.toString() - + " does not exist; cannot heartbeat"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + FederationStateStoreUtils.logAndThrowStoreException( + LOG, "SubCluster %s does not exist; cannot heartbeat.", request.getSubClusterId()); } long currentTime = @@ -205,11 +209,12 @@ public class MemoryFederationStateStore implements FederationStateStore { } @Override - public GetSubClusterInfoResponse getSubCluster( - GetSubClusterInfoRequest request) throws YarnException { + public GetSubClusterInfoResponse getSubCluster(GetSubClusterInfoRequest request) + throws YarnException { FederationMembershipStateStoreInputValidator.validate(request); SubClusterId subClusterId = request.getSubClusterId(); + if (!membership.containsKey(subClusterId)) { LOG.warn("The queried SubCluster: {} does not exist.", subClusterId); return null; @@ -219,16 +224,17 @@ public class MemoryFederationStateStore implements FederationStateStore { } @Override - public GetSubClustersInfoResponse getSubClusters( - GetSubClustersInfoRequest request) throws YarnException { - List result = new ArrayList(); + public GetSubClustersInfoResponse getSubClusters(GetSubClustersInfoRequest request) + throws YarnException { + + List result = new ArrayList<>(); for (SubClusterInfo info : membership.values()) { - if (!request.getFilterInactiveSubClusters() - || info.getState().isActive()) { + if (!request.getFilterInactiveSubClusters() || info.getState().isActive()) { result.add(info); } } + return GetSubClustersInfoResponse.newInstance(result); } @@ -239,16 +245,16 @@ public class MemoryFederationStateStore implements FederationStateStore { AddApplicationHomeSubClusterRequest request) throws YarnException { FederationApplicationHomeSubClusterStoreInputValidator.validate(request); - ApplicationId appId = - request.getApplicationHomeSubCluster().getApplicationId(); + ApplicationHomeSubCluster homeSubCluster = request.getApplicationHomeSubCluster(); + + ApplicationId appId = homeSubCluster.getApplicationId(); if (!applications.containsKey(appId)) { - applications.put(appId, - request.getApplicationHomeSubCluster().getHomeSubCluster()); + applications.put(appId, homeSubCluster); } - return AddApplicationHomeSubClusterResponse - .newInstance(applications.get(appId)); + ApplicationHomeSubCluster respHomeSubCluster = applications.get(appId); + return AddApplicationHomeSubClusterResponse.newInstance(respHomeSubCluster.getHomeSubCluster()); } @Override @@ -256,15 +262,16 @@ public class MemoryFederationStateStore implements FederationStateStore { UpdateApplicationHomeSubClusterRequest request) throws YarnException { FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + ApplicationId appId = request.getApplicationHomeSubCluster().getApplicationId(); + if (!applications.containsKey(appId)) { - String errMsg = "Application " + appId + " does not exist"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Application %s does not exist.", appId); } - applications.put(appId, - request.getApplicationHomeSubCluster().getHomeSubCluster()); + applications.put(appId, request.getApplicationHomeSubCluster()); return UpdateApplicationHomeSubClusterResponse.newInstance(); } @@ -275,11 +282,12 @@ public class MemoryFederationStateStore implements FederationStateStore { FederationApplicationHomeSubClusterStoreInputValidator.validate(request); ApplicationId appId = request.getApplicationId(); if (!applications.containsKey(appId)) { - String errMsg = "Application " + appId + " does not exist"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Application %s does not exist.", appId); } - return GetApplicationHomeSubClusterResponse.newInstance(appId, applications.get(appId)); + return GetApplicationHomeSubClusterResponse.newInstance(appId, + applications.get(appId).getHomeSubCluster()); } @Override @@ -303,7 +311,7 @@ public class MemoryFederationStateStore implements FederationStateStore { } private ApplicationHomeSubCluster generateAppHomeSC(ApplicationId applicationId) { - SubClusterId subClusterId = applications.get(applicationId); + SubClusterId subClusterId = applications.get(applicationId).getHomeSubCluster(); return ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); } @@ -314,8 +322,8 @@ public class MemoryFederationStateStore implements FederationStateStore { FederationApplicationHomeSubClusterStoreInputValidator.validate(request); ApplicationId appId = request.getApplicationId(); if (!applications.containsKey(appId)) { - String errMsg = "Application " + appId + " does not exist"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Application %s does not exist.", appId); } applications.remove(appId); @@ -329,12 +337,11 @@ public class MemoryFederationStateStore implements FederationStateStore { FederationPolicyStoreInputValidator.validate(request); String queue = request.getQueue(); if (!policies.containsKey(queue)) { - LOG.warn("Policy for queue: {} does not exist.", queue); + LOG.warn("Policy for queue : {} does not exist.", queue); return null; } - return GetSubClusterPolicyConfigurationResponse - .newInstance(policies.get(queue)); + return GetSubClusterPolicyConfigurationResponse.newInstance(policies.get(queue)); } @Override @@ -350,8 +357,7 @@ public class MemoryFederationStateStore implements FederationStateStore { @Override public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( GetSubClusterPoliciesConfigurationsRequest request) throws YarnException { - ArrayList result = - new ArrayList(); + ArrayList result = new ArrayList<>(); for (SubClusterPolicyConfiguration policy : policies.values()) { result.add(policy); } @@ -386,7 +392,8 @@ public class MemoryFederationStateStore implements FederationStateStore { FederationReservationHomeSubClusterStoreInputValidator.validate(request); ReservationId reservationId = request.getReservationId(); if (!reservations.containsKey(reservationId)) { - throw new YarnException("Reservation " + reservationId + " does not exist"); + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Reservation %s does not exist.", reservationId); } SubClusterId subClusterId = reservations.get(reservationId); ReservationHomeSubCluster homeSubCluster = @@ -417,7 +424,8 @@ public class MemoryFederationStateStore implements FederationStateStore { ReservationId reservationId = request.getReservationHomeSubCluster().getReservationId(); if (!reservations.containsKey(reservationId)) { - throw new YarnException("Reservation " + reservationId + " does not exist."); + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Reservation %s does not exist.", reservationId); } SubClusterId subClusterId = request.getReservationHomeSubCluster().getHomeSubCluster(); @@ -431,7 +439,8 @@ public class MemoryFederationStateStore implements FederationStateStore { FederationReservationHomeSubClusterStoreInputValidator.validate(request); ReservationId reservationId = request.getReservationId(); if (!reservations.containsKey(reservationId)) { - throw new YarnException("Reservation " + reservationId + " does not exist"); + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Reservation %s does not exist.", reservationId); } reservations.remove(reservationId); return DeleteReservationHomeSubClusterResponse.newInstance(); @@ -446,9 +455,8 @@ public class MemoryFederationStateStore implements FederationStateStore { Set rmDTMasterKeyState = routerRMSecretManagerState.getMasterKeyState(); if (rmDTMasterKeyState.contains(delegationKey)) { - LOG.info("Error storing info for RMDTMasterKey with keyID: {}.", delegationKey.getKeyId()); - throw new IOException("RMDTMasterKey with keyID: " + delegationKey.getKeyId() + - " is already stored"); + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Error storing info for RMDTMasterKey with keyID: %s.", delegationKey.getKeyId()); } routerRMSecretManagerState.getMasterKeyState().add(delegationKey); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index c93115ccfd3..1952d47cb54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -164,12 +164,9 @@ public abstract class FederationStateStoreBaseTest { SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED); - try { - stateStore.deregisterSubCluster(deregisterRequest); - Assert.fail(); - } catch (FederationStateStoreException e) { - Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found")); - } + + LambdaTestUtils.intercept(YarnException.class, + "SubCluster SC not found", () -> stateStore.deregisterSubCluster(deregisterRequest)); } @Test @@ -266,13 +263,9 @@ public abstract class FederationStateStoreBaseTest { SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest .newInstance(subClusterId, SubClusterState.SC_RUNNING, "capability"); - try { - stateStore.subClusterHeartbeat(heartbeatRequest); - Assert.fail(); - } catch (FederationStateStoreException e) { - Assert.assertTrue(e.getMessage() - .startsWith("SubCluster SC does not exist; cannot heartbeat")); - } + LambdaTestUtils.intercept(YarnException.class, + "SubCluster SC does not exist; cannot heartbeat", + () -> stateStore.subClusterHeartbeat(heartbeatRequest)); } // Test FederationApplicationHomeSubClusterStore