YARN-3104. Fixed RM to not generate new AMRM tokens on every heartbeat between rolling and activation. Contributed by Jason Lowe
(cherry picked from commit 18297e0972
)
This commit is contained in:
parent
4d0e792a82
commit
b5d6f76c6a
|
@ -537,6 +537,9 @@ Release 2.7.0 - UNRELEASED
|
|||
YARN-1580. Documentation error regarding "container-allocation.expiry-interval-ms"
|
||||
(Brahma Reddy Battula via junping_du)
|
||||
|
||||
YARN-3104. Fixed RM to not generate new AMRM tokens on every heartbeat
|
||||
between rolling and activation. (Jason Lowe via jianhe)
|
||||
|
||||
Release 2.6.0 - 2014-11-18
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -588,16 +588,20 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
if (nextMasterKey != null
|
||||
&& nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
|
||||
.getKeyId()) {
|
||||
Token<AMRMTokenIdentifier> amrmToken =
|
||||
rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
|
||||
appAttemptId);
|
||||
((RMAppAttemptImpl)appAttempt).setAMRMToken(amrmToken);
|
||||
RMAppAttemptImpl appAttemptImpl = (RMAppAttemptImpl)appAttempt;
|
||||
Token<AMRMTokenIdentifier> amrmToken = appAttempt.getAMRMToken();
|
||||
if (nextMasterKey.getMasterKey().getKeyId() !=
|
||||
appAttemptImpl.getAMRMTokenKeyId()) {
|
||||
LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back"
|
||||
+ " to application: " + applicationId);
|
||||
amrmToken = rmContext.getAMRMTokenSecretManager()
|
||||
.createAndGetAMRMToken(appAttemptId);
|
||||
appAttemptImpl.setAMRMToken(amrmToken);
|
||||
}
|
||||
allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
|
||||
.newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
|
||||
.toString(), amrmToken.getPassword(), amrmToken.getService()
|
||||
.toString()));
|
||||
LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back"
|
||||
+ " to application: " + applicationId);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
|
|||
|
||||
import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
|
@ -132,6 +134,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
private final ApplicationAttemptId applicationAttemptId;
|
||||
private final ApplicationSubmissionContext submissionContext;
|
||||
private Token<AMRMTokenIdentifier> amrmToken = null;
|
||||
private volatile Integer amrmTokenKeyId = null;
|
||||
private SecretKey clientTokenMasterKey = null;
|
||||
|
||||
private ConcurrentMap<NodeId, List<ContainerStatus>>
|
||||
|
@ -593,11 +596,34 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
this.writeLock.lock();
|
||||
try {
|
||||
this.amrmToken = lastToken;
|
||||
this.amrmTokenKeyId = null;
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Private
|
||||
public int getAMRMTokenKeyId() {
|
||||
Integer keyId = this.amrmTokenKeyId;
|
||||
if (keyId == null) {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
if (this.amrmToken == null) {
|
||||
throw new YarnRuntimeException("Missing AMRM token for "
|
||||
+ this.applicationAttemptId);
|
||||
}
|
||||
keyId = this.amrmToken.decodeIdentifier().getKeyId();
|
||||
this.amrmTokenKeyId = keyId;
|
||||
} catch (IOException e) {
|
||||
throw new YarnRuntimeException("AMRM token decode error for "
|
||||
+ this.applicationAttemptId, e);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
return keyId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<ClientToAMTokenIdentifier> createClientToken(String client) {
|
||||
this.readLock.lock();
|
||||
|
@ -846,9 +872,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
}
|
||||
}
|
||||
|
||||
this.amrmToken =
|
||||
rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
|
||||
applicationAttemptId);
|
||||
setAMRMToken(rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
|
||||
applicationAttemptId));
|
||||
}
|
||||
|
||||
private static class BaseTransition implements
|
||||
|
|
|
@ -210,37 +210,25 @@ public class MockAM {
|
|||
List<ResourceRequest> resourceRequest, List<ContainerId> releases)
|
||||
throws Exception {
|
||||
final AllocateRequest req =
|
||||
AllocateRequest.newInstance(++responseId, 0F, resourceRequest,
|
||||
AllocateRequest.newInstance(0, 0F, resourceRequest,
|
||||
releases, null);
|
||||
UserGroupInformation ugi =
|
||||
UserGroupInformation.createRemoteUser(attemptId.toString());
|
||||
Token<AMRMTokenIdentifier> token =
|
||||
context.getRMApps().get(attemptId.getApplicationId())
|
||||
.getRMAppAttempt(attemptId).getAMRMToken();
|
||||
ugi.addTokenIdentifier(token.decodeIdentifier());
|
||||
try {
|
||||
return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
|
||||
@Override
|
||||
public AllocateResponse run() throws Exception {
|
||||
return amRMProtocol.allocate(req);
|
||||
}
|
||||
});
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
throw (Exception) e.getCause();
|
||||
}
|
||||
return allocate(req);
|
||||
}
|
||||
|
||||
public AllocateResponse allocate(AllocateRequest allocateRequest)
|
||||
throws Exception {
|
||||
final AllocateRequest req = allocateRequest;
|
||||
req.setResponseId(++responseId);
|
||||
|
||||
UserGroupInformation ugi =
|
||||
UserGroupInformation.createRemoteUser(attemptId.toString());
|
||||
Token<AMRMTokenIdentifier> token =
|
||||
context.getRMApps().get(attemptId.getApplicationId())
|
||||
.getRMAppAttempt(attemptId).getAMRMToken();
|
||||
ugi.addTokenIdentifier(token.decodeIdentifier());
|
||||
return doAllocateAs(ugi, allocateRequest);
|
||||
}
|
||||
|
||||
public AllocateResponse doAllocateAs(UserGroupInformation ugi,
|
||||
final AllocateRequest req) throws Exception {
|
||||
req.setResponseId(++responseId);
|
||||
try {
|
||||
return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
|
||||
@Override
|
||||
|
|
|
@ -18,11 +18,18 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
||||
|
||||
import static org.mockito.Mockito.isA;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.reset;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -49,6 +56,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
|
@ -207,7 +216,6 @@ public class TestAMRMTokens {
|
|||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testMasterKeyRollOver() throws Exception {
|
||||
|
||||
|
@ -336,21 +344,40 @@ public class TestAMRMTokens {
|
|||
|
||||
@Test (timeout = 20000)
|
||||
public void testAMRMMasterKeysUpdate() throws Exception {
|
||||
final AtomicReference<AMRMTokenSecretManager> spySecretMgrRef =
|
||||
new AtomicReference<AMRMTokenSecretManager>();
|
||||
MockRM rm = new MockRM(conf) {
|
||||
@Override
|
||||
protected void doSecureLogin() throws IOException {
|
||||
// Skip the login.
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RMSecretManagerService createRMSecretManagerService() {
|
||||
return new RMSecretManagerService(conf, rmContext) {
|
||||
@Override
|
||||
protected AMRMTokenSecretManager createAMRMTokenSecretManager(
|
||||
Configuration conf, RMContext rmContext) {
|
||||
AMRMTokenSecretManager spySecretMgr = spy(
|
||||
super.createAMRMTokenSecretManager(conf, rmContext));
|
||||
spySecretMgrRef.set(spySecretMgr);
|
||||
return spySecretMgr;
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
rm.start();
|
||||
MockNM nm = rm.registerNode("127.0.0.1:1234", 8000);
|
||||
RMApp app = rm.submitApp(200);
|
||||
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm);
|
||||
|
||||
AMRMTokenSecretManager spySecretMgr = spySecretMgrRef.get();
|
||||
// Do allocate. Should not update AMRMToken
|
||||
AllocateResponse response =
|
||||
am.allocate(Records.newRecord(AllocateRequest.class));
|
||||
Assert.assertNull(response.getAMRMToken());
|
||||
Token<AMRMTokenIdentifier> oldToken = rm.getRMContext().getRMApps()
|
||||
.get(app.getApplicationId())
|
||||
.getRMAppAttempt(am.getApplicationAttemptId()).getAMRMToken();
|
||||
|
||||
// roll over the master key
|
||||
// Do allocate again. the AM should get the latest AMRMToken
|
||||
|
@ -366,8 +393,18 @@ public class TestAMRMTokens {
|
|||
.getRMContext().getAMRMTokenSecretManager().getMasterKey().getMasterKey()
|
||||
.getKeyId());
|
||||
|
||||
// Do allocate again. The master key does not update.
|
||||
// AM should not update its AMRMToken either
|
||||
// Do allocate again with the same old token and verify the RM sends
|
||||
// back the last generated token instead of generating it again.
|
||||
reset(spySecretMgr);
|
||||
UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
|
||||
am.getApplicationAttemptId().toString(), new String[0]);
|
||||
ugi.addTokenIdentifier(oldToken.decodeIdentifier());
|
||||
response = am.doAllocateAs(ugi, Records.newRecord(AllocateRequest.class));
|
||||
Assert.assertNotNull(response.getAMRMToken());
|
||||
verify(spySecretMgr, never()).createAndGetAMRMToken(isA(ApplicationAttemptId.class));
|
||||
|
||||
// Do allocate again with the updated token and verify we do not
|
||||
// receive a new token to use.
|
||||
response = am.allocate(Records.newRecord(AllocateRequest.class));
|
||||
Assert.assertNull(response.getAMRMToken());
|
||||
|
||||
|
|
Loading…
Reference in New Issue