diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java index 7a1511496cd..63a8aa62f00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java @@ -37,11 +37,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.curator.ZKCuratorManager; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto; import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterInfoProto; import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterPolicyConfigurationProto; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.ApplicationHomeSubClusterProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; @@ -94,6 +96,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenReque import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl; import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl; import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl; +import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.ApplicationHomeSubClusterPBImpl; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey; import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator; @@ -323,49 +326,76 @@ public class ZookeeperFederationStateStore implements FederationStateStore { long start = clock.getTime(); FederationApplicationHomeSubClusterStoreInputValidator.validate(request); - ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster(); - ApplicationId appId = app.getApplicationId(); + + // parse parameters + // We need to get applicationId, subClusterId, appSubmissionContext 3 parameters. + ApplicationHomeSubCluster requestApplicationHomeSubCluster = + request.getApplicationHomeSubCluster(); + ApplicationId requestApplicationId = requestApplicationHomeSubCluster.getApplicationId(); + SubClusterId requestSubClusterId = requestApplicationHomeSubCluster.getHomeSubCluster(); + ApplicationSubmissionContext requestApplicationSubmissionContext = + requestApplicationHomeSubCluster.getApplicationSubmissionContext(); + + LOG.debug("applicationId = {}, subClusterId = {}, appSubmissionContext = {}.", + requestApplicationId, requestSubClusterId, requestApplicationSubmissionContext); // Try to write the subcluster - SubClusterId homeSubCluster = app.getHomeSubCluster(); try { - putApp(appId, homeSubCluster, false); + storeOrUpdateApplicationHomeSubCluster(requestApplicationId, + requestApplicationHomeSubCluster, false); } catch (Exception e) { - String errMsg = "Cannot add application home subcluster for " + appId; + String errMsg = "Cannot add application home subcluster for " + requestApplicationId; FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } // Check for the actual subcluster try { - homeSubCluster = getApp(appId); + // We try to get the ApplicationHomeSubCluster actually stored in ZK + // according to the applicationId. + ApplicationHomeSubCluster actualAppHomeSubCluster = + getApplicationHomeSubCluster(requestApplicationId); + SubClusterId responseSubClusterId = actualAppHomeSubCluster.getHomeSubCluster(); + long end = clock.getTime(); + opDurations.addAppHomeSubClusterDuration(start, end); + return AddApplicationHomeSubClusterResponse.newInstance(responseSubClusterId); } catch (Exception e) { - String errMsg = "Cannot check app home subcluster for " + appId; + String errMsg = "Cannot check app home subcluster for " + requestApplicationId; FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } - long end = clock.getTime(); - opDurations.addAppHomeSubClusterDuration(start, end); - return AddApplicationHomeSubClusterResponse - .newInstance(homeSubCluster); + + // Throw YarnException. + throw new YarnException("Cannot addApplicationHomeSubCluster by request"); } @Override - public UpdateApplicationHomeSubClusterResponse - updateApplicationHomeSubCluster( - UpdateApplicationHomeSubClusterRequest request) - throws YarnException { + public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( + UpdateApplicationHomeSubClusterRequest request) throws YarnException { long start = clock.getTime(); FederationApplicationHomeSubClusterStoreInputValidator.validate(request); - ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster(); - ApplicationId appId = app.getApplicationId(); - SubClusterId homeSubCluster = getApp(appId); - if (homeSubCluster == null) { - String errMsg = "Application " + appId + " does not exist"; + ApplicationHomeSubCluster requestApplicationHomeSubCluster = + request.getApplicationHomeSubCluster(); + ApplicationId requestApplicationId = requestApplicationHomeSubCluster.getApplicationId(); + ApplicationHomeSubCluster zkStoreApplicationHomeSubCluster = + getApplicationHomeSubCluster(requestApplicationId); + + if (zkStoreApplicationHomeSubCluster == null) { + String errMsg = "Application " + requestApplicationId + " does not exist"; FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } - SubClusterId newSubClusterId = - request.getApplicationHomeSubCluster().getHomeSubCluster(); - putApp(appId, newSubClusterId, true); + + SubClusterId oldSubClusterId = zkStoreApplicationHomeSubCluster.getHomeSubCluster(); + SubClusterId newSubClusterId = requestApplicationHomeSubCluster.getHomeSubCluster(); + ApplicationSubmissionContext requestApplicationSubmissionContext = + requestApplicationHomeSubCluster.getApplicationSubmissionContext(); + + LOG.debug("applicationId = {}, oldHomeSubClusterId = {}, newHomeSubClusterId = {}, " + + "appSubmissionContext = {}.", requestApplicationId, oldSubClusterId, newSubClusterId, + requestApplicationSubmissionContext); + + // update stored ApplicationHomeSubCluster + storeOrUpdateApplicationHomeSubCluster(requestApplicationId, + requestApplicationHomeSubCluster, true); long end = clock.getTime(); opDurations.addUpdateAppHomeSubClusterDuration(start, end); @@ -378,15 +408,33 @@ public class ZookeeperFederationStateStore implements FederationStateStore { long start = clock.getTime(); FederationApplicationHomeSubClusterStoreInputValidator.validate(request); - ApplicationId appId = request.getApplicationId(); - SubClusterId homeSubCluster = getApp(appId); - if (homeSubCluster == null) { - String errMsg = "Application " + appId + " does not exist"; + ApplicationId requestApplicationId = request.getApplicationId(); + + ApplicationHomeSubCluster zkStoreApplicationHomeSubCluster = + getApplicationHomeSubCluster(requestApplicationId); + if (zkStoreApplicationHomeSubCluster == null) { + String errMsg = "Application " + requestApplicationId + " does not exist"; FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } + + // Prepare to return data + SubClusterId subClusterId = zkStoreApplicationHomeSubCluster.getHomeSubCluster(); + long createTime = zkStoreApplicationHomeSubCluster.getCreateTime(); + long end = clock.getTime(); opDurations.addGetAppHomeSubClusterDuration(start, end); - return GetApplicationHomeSubClusterResponse.newInstance(appId, homeSubCluster); + + // If the request asks for an ApplicationSubmissionContext to be returned, + // we will return + if (request.getContainsAppSubmissionContext()) { + ApplicationSubmissionContext submissionContext = + zkStoreApplicationHomeSubCluster.getApplicationSubmissionContext(); + return GetApplicationHomeSubClusterResponse.newInstance( + requestApplicationId, subClusterId, createTime, submissionContext); + } + + return GetApplicationHomeSubClusterResponse.newInstance(requestApplicationId, + subClusterId, createTime); } @Override @@ -421,13 +469,18 @@ public class ZookeeperFederationStateStore implements FederationStateStore { private ApplicationHomeSubCluster generateAppHomeSC(String appId) { try { + // Parse ApplicationHomeSubCluster ApplicationId applicationId = ApplicationId.fromString(appId); - SubClusterId homeSubCluster = getApp(applicationId); - ApplicationHomeSubCluster app = - ApplicationHomeSubCluster.newInstance(applicationId, homeSubCluster); - return app; + ApplicationHomeSubCluster zkStoreApplicationHomeSubCluster = + getApplicationHomeSubCluster(applicationId); + + // Prepare to return data + SubClusterId subClusterId = zkStoreApplicationHomeSubCluster.getHomeSubCluster(); + ApplicationHomeSubCluster resultApplicationHomeSubCluster = + ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); + return resultApplicationHomeSubCluster; } catch (Exception ex) { - LOG.error("get homeSubCluster by appId = {}.", appId); + LOG.error("get homeSubCluster by appId = {}.", appId, ex); } return null; } @@ -674,39 +727,43 @@ public class ZookeeperFederationStateStore implements FederationStateStore { /** * Get the subcluster for an application. + * * @param appId Application identifier. - * @return Subcluster identifier. + * @return ApplicationHomeSubCluster identifier. * @throws Exception If it cannot contact ZooKeeper. */ - private SubClusterId getApp(final ApplicationId appId) throws YarnException { + private ApplicationHomeSubCluster getApplicationHomeSubCluster( + final ApplicationId appId) throws YarnException { String appZNode = getNodePath(appsZNode, appId.toString()); - SubClusterId subClusterId = null; + ApplicationHomeSubCluster appHomeSubCluster = null; byte[] data = get(appZNode); if (data != null) { try { - subClusterId = new SubClusterIdPBImpl( - SubClusterIdProto.parseFrom(data)); + appHomeSubCluster = new ApplicationHomeSubClusterPBImpl( + ApplicationHomeSubClusterProto.parseFrom(data)); } catch (InvalidProtocolBufferException e) { String errMsg = "Cannot parse application at " + appZNode; FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } } - return subClusterId; + return appHomeSubCluster; } /** - * Put an application. - * @param appId Application identifier. - * @param subClusterId Subcluster identifier. + * We will store the data of ApplicationHomeSubCluster according to appId. + * + * @param applicationId ApplicationId. + * @param applicationHomeSubCluster ApplicationHomeSubCluster. + * @param update false, add records; true, update records. * @throws Exception If it cannot contact ZooKeeper. */ - private void putApp(final ApplicationId appId, - final SubClusterId subClusterId, boolean update) - throws YarnException { - String appZNode = getNodePath(appsZNode, appId.toString()); - SubClusterIdProto proto = - ((SubClusterIdPBImpl)subClusterId).getProto(); + private void storeOrUpdateApplicationHomeSubCluster(final ApplicationId applicationId, + final ApplicationHomeSubCluster applicationHomeSubCluster, boolean update) + throws YarnException { + String appZNode = getNodePath(appsZNode, applicationId.toString()); + ApplicationHomeSubClusterProto proto = + ((ApplicationHomeSubClusterPBImpl) applicationHomeSubCluster).getProto(); byte[] data = proto.toByteArray(); put(appZNode, data, update); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index 299e7001c9e..cb2059f0261 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -406,7 +406,7 @@ public abstract class FederationStateStoreBaseTest { ApplicationId appId1 = ApplicationId.newInstance(1, 1); SubClusterId subClusterId1 = SubClusterId.newInstance("SC1"); ApplicationHomeSubCluster ahsc1 = - ApplicationHomeSubCluster.newInstance(appId1, subClusterId1); + ApplicationHomeSubCluster.newInstance(appId1, subClusterId1); ApplicationId appId2 = ApplicationId.newInstance(1, 2); SubClusterId subClusterId2 = SubClusterId.newInstance("SC2"); @@ -470,6 +470,7 @@ public abstract class FederationStateStoreBaseTest { Assert.assertEquals(10, items.size()); for (ApplicationHomeSubCluster item : items) { + appHomeSubClusters.contains(item); Assert.assertTrue(appHomeSubClusters.contains(item)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java index ba22a1e1894..e07cb4fe474 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java @@ -33,6 +33,10 @@ import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.metrics2.impl.MetricsRecords; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; @@ -41,6 +45,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse; import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.util.Records; import org.junit.After; import org.junit.Before; @@ -276,4 +284,29 @@ public class TestZookeeperFederationStateStore extends FederationStateStoreBaseT assertNotNull(zkRouterStoreToken); assertEquals(token, zkRouterStoreToken); } + + @Test + public void testGetApplicationHomeSubClusterWithContext() throws Exception { + ZookeeperFederationStateStore zkFederationStateStore = + (ZookeeperFederationStateStore) this.getStateStore(); + + ApplicationId appId = ApplicationId.newInstance(1, 3); + SubClusterId subClusterId = SubClusterId.newInstance("SC"); + ApplicationSubmissionContext context = + ApplicationSubmissionContext.newInstance(appId, "test", "default", + Priority.newInstance(0), null, true, true, + 2, Resource.newInstance(10, 2), "test"); + addApplicationHomeSC(appId, subClusterId, context); + + GetApplicationHomeSubClusterRequest getRequest = + GetApplicationHomeSubClusterRequest.newInstance(appId, true); + GetApplicationHomeSubClusterResponse result = + zkFederationStateStore.getApplicationHomeSubCluster(getRequest); + + ApplicationHomeSubCluster applicationHomeSubCluster = result.getApplicationHomeSubCluster(); + + assertEquals(appId, applicationHomeSubCluster.getApplicationId()); + assertEquals(subClusterId, applicationHomeSubCluster.getHomeSubCluster()); + assertEquals(context, applicationHomeSubCluster.getApplicationSubmissionContext()); + } } \ No newline at end of file