MAPREDUCE-6230. Fixed RMContainerAllocator to update the new AMRMToken service name properly. Contributed by Jason Lowe

(cherry picked from commit cff05bff1f)

(cherry picked from commit 43b3b43cea)
(cherry picked from commit 013e271036554020e3d26de4fcbb5b6970f5a9c0)
This commit is contained in:
Jian He 2015-01-28 15:51:30 -08:00 committed by Vinod Kumar Vavilapalli
parent 8658945b3a
commit e917ef3cfe
3 changed files with 98 additions and 3 deletions

View File

@ -18,6 +18,9 @@ Release 2.6.1 - UNRELEASED
MAPREDUCE-6166. Reducers do not validate checksum of map outputs when
fetching directly to disk. (Eric Payne via gera)
MAPREDUCE-6230. Fixed RMContainerAllocator to update the new AMRMToken
service name properly. (Jason Lowe via jianhe)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
@ -763,10 +764,8 @@ public class RMContainerAllocator extends RMContainerRequestor
.getIdentifier().array(), token.getPassword().array(), new Text(
token.getKind()), new Text(token.getService()));
UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
if (UserGroupInformation.isSecurityEnabled()) {
currentUGI = UserGroupInformation.getLoginUser();
}
currentUGI.addToken(amrmToken);
amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig()));
}
@VisibleForTesting

View File

@ -31,6 +31,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
@ -45,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -74,7 +76,9 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -109,6 +113,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
@ -2290,6 +2295,93 @@ public class TestRMContainerAllocator {
}
@Test(timeout=60000)
public void testAMRMTokenUpdate() throws Exception {
LOG.info("Running testAMRMTokenUpdate");
final String rmAddr = "somermaddress:1234";
final Configuration conf = new YarnConfiguration();
conf.setLong(
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, 8);
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 2000);
conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, rmAddr);
final MyResourceManager rm = new MyResourceManager(conf);
rm.start();
AMRMTokenSecretManager secretMgr =
rm.getRMContext().getAMRMTokenSecretManager();
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
dispatcher.await();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
dispatcher.await();
final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
final ApplicationId appId = app.getApplicationId();
rm.sendAMLaunched(appAttemptId);
dispatcher.await();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
final Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final Token<AMRMTokenIdentifier> oldToken = rm.getRMContext().getRMApps()
.get(appId).getRMAppAttempt(appAttemptId).getAMRMToken();
Assert.assertNotNull("app should have a token", oldToken);
UserGroupInformation testUgi = UserGroupInformation.createUserForTesting(
"someuser", new String[0]);
Token<AMRMTokenIdentifier> newToken = testUgi.doAs(
new PrivilegedExceptionAction<Token<AMRMTokenIdentifier>>() {
@Override
public Token<AMRMTokenIdentifier> run() throws Exception {
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
// Keep heartbeating until RM thinks the token has been updated
Token<AMRMTokenIdentifier> currentToken = oldToken;
long startTime = Time.monotonicNow();
while (currentToken == oldToken) {
if (Time.monotonicNow() - startTime > 20000) {
Assert.fail("Took to long to see AMRM token change");
}
Thread.sleep(100);
allocator.schedule();
currentToken = rm.getRMContext().getRMApps().get(appId)
.getRMAppAttempt(appAttemptId).getAMRMToken();
}
return currentToken;
}
});
// verify there is only one AMRM token in the UGI and it matches the
// updated token from the RM
int tokenCount = 0;
Token<? extends TokenIdentifier> ugiToken = null;
for (Token<? extends TokenIdentifier> token : testUgi.getTokens()) {
if (AMRMTokenIdentifier.KIND_NAME.equals(token.getKind())) {
ugiToken = token;
++tokenCount;
}
}
Assert.assertEquals("too many AMRM tokens", 1, tokenCount);
Assert.assertArrayEquals("token identifier not updated",
newToken.getIdentifier(), ugiToken.getIdentifier());
Assert.assertArrayEquals("token password not updated",
newToken.getPassword(), ugiToken.getPassword());
Assert.assertEquals("AMRM token service not updated",
new Text(rmAddr), ugiToken.getService());
}
public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple();
@ -2299,6 +2391,7 @@ public class TestRMContainerAllocator {
t.testReportedAppProgressWithOnlyMaps();
t.testBlackListedNodes();
t.testCompletedTasksRecalculateSchedule();
t.testAMRMTokenUpdate();
}
}