YARN-6982. Potential issue on setting AMContainerSpec#tokenConf to null before app is completed. Contributed by Manikandan R.
(cherry picked from commit 4cae120c61
)
This commit is contained in:
parent
870ef0b2fd
commit
877d96b83d
|
@ -1035,8 +1035,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
app.submissionContext.getCancelTokensWhenComplete(),
|
app.submissionContext.getCancelTokensWhenComplete(),
|
||||||
app.getUser(),
|
app.getUser(),
|
||||||
BuilderUtils.parseTokensConf(app.submissionContext));
|
BuilderUtils.parseTokensConf(app.submissionContext));
|
||||||
// set the memory free
|
|
||||||
app.submissionContext.getAMContainerSpec().setTokensConf(null);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String msg = "Failed to fetch user credentials from application:"
|
String msg = "Failed to fetch user credentials from application:"
|
||||||
+ e.getMessage();
|
+ e.getMessage();
|
||||||
|
@ -1089,8 +1087,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
app.submissionContext, false, app.applicationPriority));
|
app.submissionContext, false, app.applicationPriority));
|
||||||
// send the ATS create Event
|
// send the ATS create Event
|
||||||
app.sendATSCreateEvent();
|
app.sendATSCreateEvent();
|
||||||
// Set the memory free after submission context is persisted
|
|
||||||
app.submissionContext.getAMContainerSpec().setTokensConf(null);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -513,6 +513,17 @@ public class MockRM extends ResourceManager {
|
||||||
false, false, null, 0, null, true, priority);
|
false, false, null, 0, null, true, priority);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public RMApp submitApp(int masterMemory, Priority priority,
|
||||||
|
Credentials cred, ByteBuffer tokensConf) throws Exception {
|
||||||
|
Resource resource = Resource.newInstance(masterMemory, 0);
|
||||||
|
return submitApp(resource, "", UserGroupInformation.getCurrentUser()
|
||||||
|
.getShortUserName(), null, false, null,
|
||||||
|
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), cred, null, true,
|
||||||
|
false, false, null, 0, null, true, priority, null, null,
|
||||||
|
tokensConf);
|
||||||
|
}
|
||||||
|
|
||||||
public RMApp submitApp(int masterMemory, boolean unmanaged)
|
public RMApp submitApp(int masterMemory, boolean unmanaged)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
|
return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
|
||||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
import org.apache.hadoop.service.Service.STATE;
|
import org.apache.hadoop.service.Service.STATE;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||||
|
@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -107,6 +109,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
@ -2517,4 +2520,116 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 20000)
|
||||||
|
public void testRMRestartAfterPriorityChangesInAllocatedResponse()
|
||||||
|
throws Exception {
|
||||||
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
|
"kerberos");
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
|
||||||
|
// Set Max Application Priority as 10
|
||||||
|
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
|
||||||
|
10);
|
||||||
|
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
|
||||||
|
false);
|
||||||
|
|
||||||
|
//Start RM
|
||||||
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
|
MockRM rm = new TestSecurityMockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
MemoryRMStateStore memStore = (MemoryRMStateStore) rm.getRMStateStore();
|
||||||
|
|
||||||
|
// Register node1
|
||||||
|
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * 1024);
|
||||||
|
|
||||||
|
// Submit an application
|
||||||
|
Priority appPriority1 = Priority.newInstance(5);
|
||||||
|
RMApp app1 = rm.submitApp(2048, appPriority1,
|
||||||
|
getCreds(), getTokensConf());
|
||||||
|
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||||
|
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||||
|
am1.registerAppAttempt();
|
||||||
|
|
||||||
|
AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
|
||||||
|
List<ContainerId> release = new ArrayList<ContainerId>();
|
||||||
|
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||||
|
allocateRequest.setReleaseList(release);
|
||||||
|
allocateRequest.setAskList(ask);
|
||||||
|
|
||||||
|
AllocateResponse response1 = am1.allocate(allocateRequest);
|
||||||
|
Assert.assertEquals(appPriority1, response1.getApplicationPriority());
|
||||||
|
|
||||||
|
// Change the priority of App1 to 8
|
||||||
|
Priority appPriority2 = Priority.newInstance(8);
|
||||||
|
UserGroupInformation ugi = UserGroupInformation
|
||||||
|
.createRemoteUser(app1.getUser());
|
||||||
|
rm.getRMAppManager().updateApplicationPriority(ugi,
|
||||||
|
app1.getApplicationId(), appPriority2);
|
||||||
|
|
||||||
|
AllocateResponse response2 = am1.allocate(allocateRequest);
|
||||||
|
Assert.assertEquals(appPriority2, response2.getApplicationPriority());
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Ensure tokensConf has been retained even after UPDATE_APP event in
|
||||||
|
* RMStateStore, which gets triggered because of change in priority.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
Map<ApplicationId, ApplicationStateData> rmAppState =
|
||||||
|
memStore.getState().getApplicationState();
|
||||||
|
ApplicationStateData appState =
|
||||||
|
rmAppState.get(app1.getApplicationId());
|
||||||
|
Assert.assertEquals(getTokensConf(),
|
||||||
|
appState.getApplicationSubmissionContext().
|
||||||
|
getAMContainerSpec().getTokensConf());
|
||||||
|
|
||||||
|
|
||||||
|
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
|
||||||
|
rm2.start();
|
||||||
|
|
||||||
|
AllocateResponse response3 = am1.allocate(allocateRequest);
|
||||||
|
Assert.assertEquals(appPriority2, response3.getApplicationPriority());
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Ensure tokensConf has been retained even after RECOVER event in
|
||||||
|
* RMStateStore, which gets triggered as part of RM START.
|
||||||
|
*/
|
||||||
|
Map<ApplicationId, ApplicationStateData> rmAppStateNew =
|
||||||
|
memStore.getState().getApplicationState();
|
||||||
|
ApplicationStateData appStateNew =
|
||||||
|
rmAppStateNew.get(app1.getApplicationId());
|
||||||
|
Assert.assertEquals(getTokensConf(),
|
||||||
|
appStateNew.getApplicationSubmissionContext().
|
||||||
|
getAMContainerSpec().getTokensConf());
|
||||||
|
|
||||||
|
rm.stop();
|
||||||
|
rm2.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Credentials getCreds() throws IOException {
|
||||||
|
Credentials ts = new Credentials();
|
||||||
|
DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
|
ts.writeTokenStorageToStream(dob);
|
||||||
|
return ts;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ByteBuffer getTokensConf() throws IOException {
|
||||||
|
DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
|
Configuration appConf = new Configuration(false);
|
||||||
|
appConf.clear();
|
||||||
|
appConf.set("dfs.nameservices", "mycluster1,mycluster2");
|
||||||
|
appConf.set("dfs.namenode.rpc-address.mycluster2.nn1",
|
||||||
|
"123.0.0.1");
|
||||||
|
appConf.set("dfs.namenode.rpc-address.mycluster3.nn2",
|
||||||
|
"123.0.0.2");
|
||||||
|
appConf.write(dob);
|
||||||
|
ByteBuffer tokenConf =
|
||||||
|
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||||
|
return tokenConf;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue