diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 181c5f40c02..a9351f72721 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1035,8 +1035,6 @@ public class RMAppImpl implements RMApp, Recoverable { app.submissionContext.getCancelTokensWhenComplete(), app.getUser(), BuilderUtils.parseTokensConf(app.submissionContext)); - // set the memory free - app.submissionContext.getAMContainerSpec().setTokensConf(null); } catch (Exception e) { String msg = "Failed to fetch user credentials from application:" + e.getMessage(); @@ -1089,8 +1087,6 @@ public class RMAppImpl implements RMApp, Recoverable { app.submissionContext, false, app.applicationPriority)); // send the ATS create Event app.sendATSCreateEvent(); - // Set the memory free after submission context is persisted - app.submissionContext.getAMContainerSpec().setTokensConf(null); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 98e96f5bb45..90507f34686 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -513,6 +513,17 @@ public class MockRM extends ResourceManager { false, false, null, 0, null, true, priority); } + public RMApp submitApp(int masterMemory, Priority priority, + Credentials cred, ByteBuffer tokensConf) throws Exception { + Resource resource = Resource.newInstance(masterMemory, 0); + return submitApp(resource, "", UserGroupInformation.getCurrentUser() + .getShortUserName(), null, false, null, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), cred, null, true, + false, false, null, 0, null, true, priority, null, null, + tokensConf); + } + public RMApp submitApp(int masterMemory, boolean unmanaged) throws Exception { return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 0587cd88be4..bddb9824f06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -59,6 +59,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; @@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -107,6 +109,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -2517,4 +2520,116 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { rm2.stop(); } } + + @Test(timeout = 20000) + public void testRMRestartAfterPriorityChangesInAllocatedResponse() + throws Exception { + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + // Set Max Application Priority as 10 + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, + 10); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, + false); + + //Start RM + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + MockRM rm = new TestSecurityMockRM(conf); + rm.start(); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm.getRMStateStore(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * 1024); + + // Submit an application + Priority appPriority1 = Priority.newInstance(5); + RMApp app1 = rm.submitApp(2048, appPriority1, + getCreds(), getTokensConf()); + + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); + List release = new ArrayList(); + List ask = new ArrayList(); + allocateRequest.setReleaseList(release); + allocateRequest.setAskList(ask); + + AllocateResponse response1 = am1.allocate(allocateRequest); + Assert.assertEquals(appPriority1, response1.getApplicationPriority()); + + // Change the priority of App1 to 8 + Priority appPriority2 = Priority.newInstance(8); + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app1.getUser()); + rm.getRMAppManager().updateApplicationPriority(ugi, + app1.getApplicationId(), appPriority2); + + AllocateResponse response2 = am1.allocate(allocateRequest); + Assert.assertEquals(appPriority2, response2.getApplicationPriority()); + + /* + * Ensure tokensConf has been retained even after UPDATE_APP event in + * RMStateStore, which gets triggered because of change in priority. + * + */ + Map rmAppState = + memStore.getState().getApplicationState(); + ApplicationStateData appState = + rmAppState.get(app1.getApplicationId()); + Assert.assertEquals(getTokensConf(), + appState.getApplicationSubmissionContext(). + getAMContainerSpec().getTokensConf()); + + + MockRM rm2 = new TestSecurityMockRM(conf, memStore); + rm2.start(); + + AllocateResponse response3 = am1.allocate(allocateRequest); + Assert.assertEquals(appPriority2, response3.getApplicationPriority()); + + /* + * Ensure tokensConf has been retained even after RECOVER event in + * RMStateStore, which gets triggered as part of RM START. + */ + Map rmAppStateNew = + memStore.getState().getApplicationState(); + ApplicationStateData appStateNew = + rmAppStateNew.get(app1.getApplicationId()); + Assert.assertEquals(getTokensConf(), + appStateNew.getApplicationSubmissionContext(). + getAMContainerSpec().getTokensConf()); + + rm.stop(); + rm2.stop(); + } + + private Credentials getCreds() throws IOException { + Credentials ts = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + ts.writeTokenStorageToStream(dob); + return ts; + } + + private ByteBuffer getTokensConf() throws IOException { + DataOutputBuffer dob = new DataOutputBuffer(); + Configuration appConf = new Configuration(false); + appConf.clear(); + appConf.set("dfs.nameservices", "mycluster1,mycluster2"); + appConf.set("dfs.namenode.rpc-address.mycluster2.nn1", + "123.0.0.1"); + appConf.set("dfs.namenode.rpc-address.mycluster3.nn2", + "123.0.0.2"); + appConf.write(dob); + ByteBuffer tokenConf = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + return tokenConf; + } }