YARN-10500. TestDelegationTokenRenewer fails intermittently. (#2619) Contributed by Masatake Iwasaki
This commit is contained in:
parent
a5a6c973af
commit
d0562d6cd0
|
@ -181,6 +181,10 @@ public class TestDelegationTokenRenewer {
|
|||
|
||||
private static Configuration conf;
|
||||
DelegationTokenRenewer delegationTokenRenewer;
|
||||
private MockRM rm;
|
||||
private MockRM rm1;
|
||||
private MockRM rm2;
|
||||
private DelegationTokenRenewer localDtr;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpClass() throws Exception {
|
||||
|
@ -225,8 +229,25 @@ public class TestDelegationTokenRenewer {
|
|||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
public void tearDown() throws Exception {
|
||||
delegationTokenRenewer.stop();
|
||||
|
||||
if (rm != null) {
|
||||
rm.close();
|
||||
rm = null;
|
||||
}
|
||||
if (rm1 != null) {
|
||||
rm1.close();
|
||||
rm1 = null;
|
||||
}
|
||||
if (rm2 != null) {
|
||||
rm2.close();
|
||||
rm2 = null;
|
||||
}
|
||||
if (localDtr != null) {
|
||||
localDtr.close();
|
||||
localDtr = null;
|
||||
}
|
||||
}
|
||||
|
||||
private static class MyDelegationTokenSecretManager extends DelegationTokenSecretManager {
|
||||
|
@ -574,8 +595,7 @@ public class TestDelegationTokenRenewer {
|
|||
lconf.setBoolean(YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL,
|
||||
true);
|
||||
|
||||
DelegationTokenRenewer localDtr =
|
||||
createNewDelegationTokenRenewer(lconf, counter);
|
||||
localDtr = createNewDelegationTokenRenewer(lconf, counter);
|
||||
RMContext mockContext = mock(RMContext.class);
|
||||
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
||||
new ConcurrentHashMap<ApplicationId, ByteBuffer>());
|
||||
|
@ -649,8 +669,7 @@ public class TestDelegationTokenRenewer {
|
|||
lconf.setLong(
|
||||
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
|
||||
1000l);
|
||||
DelegationTokenRenewer localDtr =
|
||||
createNewDelegationTokenRenewer(lconf, counter);
|
||||
localDtr = createNewDelegationTokenRenewer(lconf, counter);
|
||||
RMContext mockContext = mock(RMContext.class);
|
||||
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
||||
new ConcurrentHashMap<ApplicationId, ByteBuffer>());
|
||||
|
@ -729,8 +748,7 @@ public class TestDelegationTokenRenewer {
|
|||
lconf.setLong(
|
||||
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
|
||||
1000l);
|
||||
DelegationTokenRenewer localDtr =
|
||||
createNewDelegationTokenRenewer(conf, counter);
|
||||
localDtr = createNewDelegationTokenRenewer(conf, counter);
|
||||
RMContext mockContext = mock(RMContext.class);
|
||||
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
||||
new ConcurrentHashMap<ApplicationId, ByteBuffer>());
|
||||
|
@ -834,8 +852,7 @@ public class TestDelegationTokenRenewer {
|
|||
doThrow(new IOException("boom"))
|
||||
.when(tokenx).renew(any(Configuration.class));
|
||||
// fire up the renewer
|
||||
final DelegationTokenRenewer dtr =
|
||||
createNewDelegationTokenRenewer(conf, counter);
|
||||
localDtr = createNewDelegationTokenRenewer(conf, counter);
|
||||
RMContext mockContext = mock(RMContext.class);
|
||||
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
||||
new ConcurrentHashMap<ApplicationId, ByteBuffer>());
|
||||
|
@ -844,13 +861,14 @@ public class TestDelegationTokenRenewer {
|
|||
InetSocketAddress sockAddr =
|
||||
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||
dtr.setRMContext(mockContext);
|
||||
when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
|
||||
dtr.init(conf);
|
||||
dtr.start();
|
||||
localDtr.setRMContext(mockContext);
|
||||
when(mockContext.getDelegationTokenRenewer()).thenReturn(localDtr);
|
||||
localDtr.init(conf);
|
||||
localDtr.start();
|
||||
|
||||
try {
|
||||
dtr.addApplicationSync(mock(ApplicationId.class), credsx, false, "user");
|
||||
localDtr.addApplicationSync(mock(ApplicationId.class),
|
||||
credsx, false, "user");
|
||||
fail("Catch IOException on app submission");
|
||||
} catch (IOException e){
|
||||
Assert.assertTrue(e.getMessage().contains(tokenx.toString()));
|
||||
|
@ -893,8 +911,8 @@ public class TestDelegationTokenRenewer {
|
|||
doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
|
||||
|
||||
// fire up the renewer
|
||||
final DelegationTokenRenewer dtr =
|
||||
createNewDelegationTokenRenewer(conf, counter);
|
||||
localDtr = createNewDelegationTokenRenewer(conf, counter);
|
||||
|
||||
RMContext mockContext = mock(RMContext.class);
|
||||
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
||||
new ConcurrentHashMap<ApplicationId, ByteBuffer>());
|
||||
|
@ -903,24 +921,24 @@ public class TestDelegationTokenRenewer {
|
|||
InetSocketAddress sockAddr =
|
||||
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||
dtr.setRMContext(mockContext);
|
||||
when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
|
||||
dtr.init(conf);
|
||||
dtr.start();
|
||||
localDtr.setRMContext(mockContext);
|
||||
when(mockContext.getDelegationTokenRenewer()).thenReturn(localDtr);
|
||||
localDtr.init(conf);
|
||||
localDtr.start();
|
||||
// submit a job that blocks during renewal
|
||||
Thread submitThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false, "user",
|
||||
new Configuration());
|
||||
localDtr.addApplicationAsync(mock(ApplicationId.class),
|
||||
creds1, false, "user", new Configuration());
|
||||
}
|
||||
};
|
||||
submitThread.start();
|
||||
|
||||
// wait till 1st submit blocks, then submit another
|
||||
startBarrier.await();
|
||||
dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false, "user",
|
||||
new Configuration());
|
||||
localDtr.addApplicationAsync(mock(ApplicationId.class),
|
||||
creds2, false, "user", new Configuration());
|
||||
// signal 1st to complete
|
||||
endBarrier.await();
|
||||
submitThread.join();
|
||||
|
@ -933,7 +951,7 @@ public class TestDelegationTokenRenewer {
|
|||
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
MockRM rm = new MockRM(conf) {
|
||||
rm = new MockRM(conf) {
|
||||
@Override
|
||||
protected void doSecureLogin() throws IOException {
|
||||
// Skip the login.
|
||||
|
@ -989,7 +1007,7 @@ public class TestDelegationTokenRenewer {
|
|||
new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
|
||||
"password2".getBytes(), dtId2.getKind(), new Text("service2"));
|
||||
|
||||
final MockRM rm = new TestSecurityMockRM(conf, null) {
|
||||
rm = new TestSecurityMockRM(conf, null) {
|
||||
@Override
|
||||
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
||||
return new DelegationTokenRenewer() {
|
||||
|
@ -1079,7 +1097,7 @@ public class TestDelegationTokenRenewer {
|
|||
Credentials credentials = new Credentials();
|
||||
credentials.addToken(userText1, originalToken);
|
||||
|
||||
MockRM rm1 = new TestSecurityMockRM(yarnConf);
|
||||
rm1 = new TestSecurityMockRM(yarnConf);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
rm1.start();
|
||||
RMApp app = rm1.submitApp(200, "name", "user",
|
||||
|
@ -1096,7 +1114,7 @@ public class TestDelegationTokenRenewer {
|
|||
"password2".getBytes(), dtId2.getKind(), new Text("service2"));
|
||||
final AtomicBoolean firstRenewInvoked = new AtomicBoolean(false);
|
||||
final AtomicBoolean secondRenewInvoked = new AtomicBoolean(false);
|
||||
MockRM rm2 = new TestSecurityMockRM(yarnConf, memStore) {
|
||||
rm2 = new TestSecurityMockRM(yarnConf, memStore) {
|
||||
@Override
|
||||
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
||||
return new DelegationTokenRenewer() {
|
||||
|
@ -1106,8 +1124,8 @@ public class TestDelegationTokenRenewer {
|
|||
throws IOException {
|
||||
|
||||
if (dttr.token.equals(updatedToken)) {
|
||||
secondRenewInvoked.set(true);
|
||||
super.renewToken(dttr);
|
||||
secondRenewInvoked.set(true);
|
||||
} else if (dttr.token.equals(originalToken)){
|
||||
firstRenewInvoked.set(true);
|
||||
throw new InvalidToken("Failed to renew");
|
||||
|
@ -1133,6 +1151,15 @@ public class TestDelegationTokenRenewer {
|
|||
final MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
||||
GenericTestUtils.waitFor(
|
||||
new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return secondRenewInvoked.get();
|
||||
}
|
||||
}, 100, 10000);
|
||||
|
||||
NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
|
||||
ByteBuffer tokenBuffer =
|
||||
response.getSystemCredentialsForApps().get(app.getApplicationId());
|
||||
|
@ -1158,7 +1185,7 @@ public class TestDelegationTokenRenewer {
|
|||
final Token<DelegationTokenIdentifier> token2 =
|
||||
new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
|
||||
"password2".getBytes(), dtId2.getKind(), new Text("service2"));
|
||||
final MockRM rm = new TestSecurityMockRM(conf, null) {
|
||||
rm = new TestSecurityMockRM(conf, null) {
|
||||
@Override
|
||||
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
||||
return new DelegationTokenRenewer() {
|
||||
|
@ -1204,7 +1231,7 @@ public class TestDelegationTokenRenewer {
|
|||
// submitted application.
|
||||
@Test (timeout = 30000)
|
||||
public void testAppSubmissionWithPreviousToken() throws Exception{
|
||||
MockRM rm = new TestSecurityMockRM(conf, null);
|
||||
rm = new TestSecurityMockRM(conf, null);
|
||||
rm.start();
|
||||
final MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
|
||||
|
@ -1266,7 +1293,7 @@ public class TestDelegationTokenRenewer {
|
|||
// complete
|
||||
@Test (timeout = 30000)
|
||||
public void testCancelWithMultipleAppSubmissions() throws Exception{
|
||||
MockRM rm = new TestSecurityMockRM(conf, null);
|
||||
rm = new TestSecurityMockRM(conf, null);
|
||||
rm.start();
|
||||
final MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
|
||||
|
@ -1362,10 +1389,10 @@ public class TestDelegationTokenRenewer {
|
|||
Assert.assertFalse(renewer.getDelegationTokens().contains(token1));
|
||||
}
|
||||
|
||||
private void finishAMAndWaitForComplete(final RMApp app, MockRM rm,
|
||||
MockNM nm, MockAM am, final DelegationTokenToRenew dttr)
|
||||
private void finishAMAndWaitForComplete(final RMApp app, MockRM mockrm,
|
||||
MockNM mocknm, MockAM mockam, final DelegationTokenToRenew dttr)
|
||||
throws Exception {
|
||||
MockRM.finishAMAndVerifyAppState(app, rm, nm, am);
|
||||
MockRM.finishAMAndVerifyAppState(app, mockrm, mocknm, mockam);
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
public Boolean get() {
|
||||
return !dttr.referringAppIds.contains(app.getApplicationId());
|
||||
|
@ -1381,7 +1408,7 @@ public class TestDelegationTokenRenewer {
|
|||
"kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
|
||||
final MockRM rm = new TestSecurityMockRM(conf, null);
|
||||
rm = new TestSecurityMockRM(conf, null);
|
||||
rm.start();
|
||||
final MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
|
||||
|
@ -1436,7 +1463,7 @@ public class TestDelegationTokenRenewer {
|
|||
UserGroupInformation.setConfiguration(conf);
|
||||
// limit 100 bytes
|
||||
conf.setInt(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_CONF_SIZE, 100);
|
||||
MockRM rm = new TestSecurityMockRM(conf, null);
|
||||
rm = new TestSecurityMockRM(conf, null);
|
||||
rm.start();
|
||||
final MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
|
||||
|
|
Loading…
Reference in New Issue