YARN-6982. Potential issue on setting AMContainerSpec#tokenConf to null before app is completed. Contributed by Manikandan R.
This commit is contained in:
parent
f3661fd08e
commit
4cae120c61
|
@ -1086,8 +1086,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
app.submissionContext.getCancelTokensWhenComplete(),
|
||||
app.getUser(),
|
||||
BuilderUtils.parseTokensConf(app.submissionContext));
|
||||
// set the memory free
|
||||
app.submissionContext.getAMContainerSpec().setTokensConf(null);
|
||||
} catch (Exception e) {
|
||||
String msg = "Failed to fetch user credentials from application:"
|
||||
+ e.getMessage();
|
||||
|
@ -1140,8 +1138,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
app.submissionContext, false, app.applicationPriority));
|
||||
// send the ATS create Event
|
||||
app.sendATSCreateEvent();
|
||||
// Set the memory free after submission context is persisted
|
||||
app.submissionContext.getAMContainerSpec().setTokensConf(null);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -513,6 +513,17 @@ public class MockRM extends ResourceManager {
|
|||
false, false, null, 0, null, true, priority);
|
||||
}
|
||||
|
||||
public RMApp submitApp(int masterMemory, Priority priority,
|
||||
Credentials cred, ByteBuffer tokensConf) throws Exception {
|
||||
Resource resource = Resource.newInstance(masterMemory, 0);
|
||||
return submitApp(resource, "", UserGroupInformation.getCurrentUser()
|
||||
.getShortUserName(), null, false, null,
|
||||
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), cred, null, true,
|
||||
false, false, null, 0, null, true, priority, null, null,
|
||||
tokensConf);
|
||||
}
|
||||
|
||||
public RMApp submitApp(int masterMemory, boolean unmanaged)
|
||||
throws Exception {
|
||||
return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.security.token.Token;
|
|||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
|
@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -107,6 +109,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
|
@ -2583,4 +2586,116 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
rm2.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 20000)
|
||||
public void testRMRestartAfterPriorityChangesInAllocatedResponse()
|
||||
throws Exception {
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
|
||||
// Set Max Application Priority as 10
|
||||
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
|
||||
10);
|
||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
|
||||
false);
|
||||
|
||||
//Start RM
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
MockRM rm = new TestSecurityMockRM(conf);
|
||||
rm.start();
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm.getRMStateStore();
|
||||
|
||||
// Register node1
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * 1024);
|
||||
|
||||
// Submit an application
|
||||
Priority appPriority1 = Priority.newInstance(5);
|
||||
RMApp app1 = rm.submitApp(2048, appPriority1,
|
||||
getCreds(), getTokensConf());
|
||||
|
||||
nm1.nodeHeartbeat(true);
|
||||
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||
am1.registerAppAttempt();
|
||||
|
||||
AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
|
||||
List<ContainerId> release = new ArrayList<ContainerId>();
|
||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||
allocateRequest.setReleaseList(release);
|
||||
allocateRequest.setAskList(ask);
|
||||
|
||||
AllocateResponse response1 = am1.allocate(allocateRequest);
|
||||
Assert.assertEquals(appPriority1, response1.getApplicationPriority());
|
||||
|
||||
// Change the priority of App1 to 8
|
||||
Priority appPriority2 = Priority.newInstance(8);
|
||||
UserGroupInformation ugi = UserGroupInformation
|
||||
.createRemoteUser(app1.getUser());
|
||||
rm.getRMAppManager().updateApplicationPriority(ugi,
|
||||
app1.getApplicationId(), appPriority2);
|
||||
|
||||
AllocateResponse response2 = am1.allocate(allocateRequest);
|
||||
Assert.assertEquals(appPriority2, response2.getApplicationPriority());
|
||||
|
||||
/*
|
||||
* Ensure tokensConf has been retained even after UPDATE_APP event in
|
||||
* RMStateStore, which gets triggered because of change in priority.
|
||||
*
|
||||
*/
|
||||
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||
memStore.getState().getApplicationState();
|
||||
ApplicationStateData appState =
|
||||
rmAppState.get(app1.getApplicationId());
|
||||
Assert.assertEquals(getTokensConf(),
|
||||
appState.getApplicationSubmissionContext().
|
||||
getAMContainerSpec().getTokensConf());
|
||||
|
||||
|
||||
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
|
||||
rm2.start();
|
||||
|
||||
AllocateResponse response3 = am1.allocate(allocateRequest);
|
||||
Assert.assertEquals(appPriority2, response3.getApplicationPriority());
|
||||
|
||||
/*
|
||||
* Ensure tokensConf has been retained even after RECOVER event in
|
||||
* RMStateStore, which gets triggered as part of RM START.
|
||||
*/
|
||||
Map<ApplicationId, ApplicationStateData> rmAppStateNew =
|
||||
memStore.getState().getApplicationState();
|
||||
ApplicationStateData appStateNew =
|
||||
rmAppStateNew.get(app1.getApplicationId());
|
||||
Assert.assertEquals(getTokensConf(),
|
||||
appStateNew.getApplicationSubmissionContext().
|
||||
getAMContainerSpec().getTokensConf());
|
||||
|
||||
rm.stop();
|
||||
rm2.stop();
|
||||
}
|
||||
|
||||
private Credentials getCreds() throws IOException {
|
||||
Credentials ts = new Credentials();
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
ts.writeTokenStorageToStream(dob);
|
||||
return ts;
|
||||
}
|
||||
|
||||
private ByteBuffer getTokensConf() throws IOException {
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
Configuration appConf = new Configuration(false);
|
||||
appConf.clear();
|
||||
appConf.set("dfs.nameservices", "mycluster1,mycluster2");
|
||||
appConf.set("dfs.namenode.rpc-address.mycluster2.nn1",
|
||||
"123.0.0.1");
|
||||
appConf.set("dfs.namenode.rpc-address.mycluster3.nn2",
|
||||
"123.0.0.2");
|
||||
appConf.write(dob);
|
||||
ByteBuffer tokenConf =
|
||||
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||
return tokenConf;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue