YARN-6016. Fix minor bugs in handling of local AMRMToken in AMRMProxy. (Botong Huang via Subru).

This commit is contained in:
Subru Krishnan 2017-01-17 14:47:27 -08:00
parent b1fce2b8b1
commit 4d1f3d9020
5 changed files with 31 additions and 7 deletions

View File

@ -139,11 +139,11 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
/* /*
* This test validates the token renewal from the AMRMPRoxy. The test verifies * This test validates the token renewal from the AMRMPRoxy. The test verifies
* that the received token it is different from the previous one within 5 * that the received token from AMRMProxy is different from the previous one
* requests. * within 5 requests.
*/ */
@Test(timeout = 120000) @Test(timeout = 120000)
public void testE2ETokenRenewal() throws Exception { public void testAMRMProxyTokenRenewal() throws Exception {
ApplicationMasterProtocol client; ApplicationMasterProtocol client;
try (MiniYARNCluster cluster = try (MiniYARNCluster cluster =
@ -176,7 +176,8 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
client.registerApplicationMaster(RegisterApplicationMasterRequest client.registerApplicationMaster(RegisterApplicationMasterRequest
.newInstance(NetUtils.getHostname(), 1024, "")); .newInstance(NetUtils.getHostname(), 1024, ""));
LOG.info("testAMRMPRoxy - Allocate Resources Application Master"); LOG.info(
"testAMRMProxyTokenRenewal - Allocate Resources Application Master");
AllocateRequest request = AllocateRequest request =
createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING)); createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
@ -196,7 +197,7 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
lastToken = response.getAMRMToken(); lastToken = response.getAMRMToken();
// Time slot to be sure the RM renew the token // Time slot to be sure the AMRMProxy renew the token
Thread.sleep(1500); Thread.sleep(1500);
} }

View File

@ -115,7 +115,7 @@ public class AMRMProxyApplicationContextImpl implements
throw new YarnRuntimeException("Missing AMRM token for " throw new YarnRuntimeException("Missing AMRM token for "
+ this.applicationAttemptId); + this.applicationAttemptId);
} }
keyId = this.amrmToken.decodeIdentifier().getKeyId(); keyId = this.localToken.decodeIdentifier().getKeyId();
this.localTokenKeyId = keyId; this.localTokenKeyId = keyId;
} catch (IOException e) { } catch (IOException e) {
throw new YarnRuntimeException("AMRM token decode error for " throw new YarnRuntimeException("AMRM token decode error for "

View File

@ -342,9 +342,14 @@ public class AMRMProxyService extends AbstractService implements
// check to see if the RM has issued a new AMRMToken & accordingly update // check to see if the RM has issued a new AMRMToken & accordingly update
// the real ARMRMToken in the current context // the real ARMRMToken in the current context
if (allocateResponse.getAMRMToken() != null) { if (allocateResponse.getAMRMToken() != null) {
LOG.info("RM rolled master-key for amrm-tokens");
org.apache.hadoop.yarn.api.records.Token token = org.apache.hadoop.yarn.api.records.Token token =
allocateResponse.getAMRMToken(); allocateResponse.getAMRMToken();
// Do not propagate this info back to AM
allocateResponse.setAMRMToken(null);
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newTokenId = org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newTokenId =
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>( new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(
token.getIdentifier().array(), token.getPassword().array(), token.getIdentifier().array(), token.getPassword().array(),

View File

@ -108,6 +108,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -297,10 +298,14 @@ public class MockResourceManagerFacade implements
Log.getLog().info("Allocating containers: " + containerList.size() Log.getLog().info("Allocating containers: " + containerList.size()
+ " for application attempt: " + conf.get("AMRMTOKEN")); + " for application attempt: " + conf.get("AMRMTOKEN"));
// Always issue a new AMRMToken as if RM rolled master key
Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], "");
return AllocateResponse.newInstance(0, return AllocateResponse.newInstance(0,
new ArrayList<ContainerStatus>(), containerList, new ArrayList<ContainerStatus>(), containerList,
new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null, new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
new ArrayList<NMToken>(), new ArrayList<NMToken>(), newAMRMToken,
new ArrayList<UpdatedContainer>()); new ArrayList<UpdatedContainer>());
} }

View File

@ -401,6 +401,9 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
AllocateResponse allocateResponse = allocate(appId, allocateRequest); AllocateResponse allocateResponse = allocate(appId, allocateRequest);
Assert.assertNotNull("allocate() returned null response", Assert.assertNotNull("allocate() returned null response",
allocateResponse); allocateResponse);
Assert.assertNull(
"new AMRMToken from RM should have been nulled by AMRMProxyService",
allocateResponse.getAMRMToken());
containers.addAll(allocateResponse.getAllocatedContainers()); containers.addAll(allocateResponse.getAllocatedContainers());
@ -412,6 +415,9 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
allocate(appId, Records.newRecord(AllocateRequest.class)); allocate(appId, Records.newRecord(AllocateRequest.class));
Assert.assertNotNull("allocate() returned null response", Assert.assertNotNull("allocate() returned null response",
allocateResponse); allocateResponse);
Assert.assertNull(
"new AMRMToken from RM should have been nulled by AMRMProxyService",
allocateResponse.getAMRMToken());
containers.addAll(allocateResponse.getAllocatedContainers()); containers.addAll(allocateResponse.getAllocatedContainers());
@ -447,6 +453,9 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
AllocateResponse allocateResponse = allocate(appId, allocateRequest); AllocateResponse allocateResponse = allocate(appId, allocateRequest);
Assert.assertNotNull(allocateResponse); Assert.assertNotNull(allocateResponse);
Assert.assertNull(
"new AMRMToken from RM should have been nulled by AMRMProxyService",
allocateResponse.getAMRMToken());
// The way the mock resource manager is setup, it will return the containers // The way the mock resource manager is setup, it will return the containers
// that were released in the response. This is done because the UAMs run // that were released in the response. This is done because the UAMs run
@ -467,6 +476,10 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
allocateResponse = allocateResponse =
allocate(appId, Records.newRecord(AllocateRequest.class)); allocate(appId, Records.newRecord(AllocateRequest.class));
Assert.assertNotNull(allocateResponse); Assert.assertNotNull(allocateResponse);
Assert.assertNull(
"new AMRMToken from RM should have been nulled by AMRMProxyService",
allocateResponse.getAMRMToken());
containersForReleasedContainerIds.addAll(allocateResponse containersForReleasedContainerIds.addAll(allocateResponse
.getAllocatedContainers()); .getAllocatedContainers());