From cff05bff1fe24628677d41a0d537f2c383b44faf Mon Sep 17 00:00:00 2001 From: Jian He Date: Wed, 28 Jan 2015 15:51:30 -0800 Subject: [PATCH] MAPREDUCE-6230. Fixed RMContainerAllocator to update the new AMRMToken service name properly. Contributed by Jason Lowe --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/rm/RMContainerAllocator.java | 5 +- .../v2/app/rm/TestRMContainerAllocator.java | 93 +++++++++++++++++++ 3 files changed, 98 insertions(+), 3 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 35ceb2e51fc..b576c292978 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -314,6 +314,9 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-3283. mapred classpath CLI does not display the complete classpath (Varun Saxena via cnauroth) + MAPREDUCE-6230. Fixed RMContainerAllocator to update the new AMRMToken + service name properly. (Jason Lowe via jianhe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 0a4f2f3989e..1acfeec163b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; @@ -783,10 +784,8 @@ public class RMContainerAllocator extends RMContainerRequestor .getIdentifier().array(), token.getPassword().array(), new Text( token.getKind()), new Text(token.getService())); UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser(); - if (UserGroupInformation.isSecurityEnabled()) { - currentUGI = UserGroupInformation.getLoginUser(); - } currentUGI.addToken(amrmToken); + amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig())); } @VisibleForTesting diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 36426702081..4759693a916 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; @@ -45,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; @@ -75,7 +77,9 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -110,6 +114,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.ControlledClock; @@ -2295,6 +2300,93 @@ public class TestRMContainerAllocator { } + @Test(timeout=60000) + public void testAMRMTokenUpdate() throws Exception { + LOG.info("Running testAMRMTokenUpdate"); + + final String rmAddr = "somermaddress:1234"; + final Configuration conf = new YarnConfiguration(); + conf.setLong( + YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, 8); + conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 2000); + conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, rmAddr); + + final MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + AMRMTokenSecretManager secretMgr = + rm.getRMContext().getAMRMTokenSecretManager(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + final ApplicationId appId = app.getApplicationId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + final Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + + final Token oldToken = rm.getRMContext().getRMApps() + .get(appId).getRMAppAttempt(appAttemptId).getAMRMToken(); + Assert.assertNotNull("app should have a token", oldToken); + UserGroupInformation testUgi = UserGroupInformation.createUserForTesting( + "someuser", new String[0]); + Token newToken = testUgi.doAs( + new PrivilegedExceptionAction>() { + @Override + public Token run() throws Exception { + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + + // Keep heartbeating until RM thinks the token has been updated + Token currentToken = oldToken; + long startTime = Time.monotonicNow(); + while (currentToken == oldToken) { + if (Time.monotonicNow() - startTime > 20000) { + Assert.fail("Took to long to see AMRM token change"); + } + Thread.sleep(100); + allocator.schedule(); + currentToken = rm.getRMContext().getRMApps().get(appId) + .getRMAppAttempt(appAttemptId).getAMRMToken(); + } + + return currentToken; + } + }); + + // verify there is only one AMRM token in the UGI and it matches the + // updated token from the RM + int tokenCount = 0; + Token ugiToken = null; + for (Token token : testUgi.getTokens()) { + if (AMRMTokenIdentifier.KIND_NAME.equals(token.getKind())) { + ugiToken = token; + ++tokenCount; + } + } + + Assert.assertEquals("too many AMRM tokens", 1, tokenCount); + Assert.assertArrayEquals("token identifier not updated", + newToken.getIdentifier(), ugiToken.getIdentifier()); + Assert.assertArrayEquals("token password not updated", + newToken.getPassword(), ugiToken.getPassword()); + Assert.assertEquals("AMRM token service not updated", + new Text(rmAddr), ugiToken.getService()); + } + public static void main(String[] args) throws Exception { TestRMContainerAllocator t = new TestRMContainerAllocator(); t.testSimple(); @@ -2304,6 +2396,7 @@ public class TestRMContainerAllocator { t.testReportedAppProgressWithOnlyMaps(); t.testBlackListedNodes(); t.testCompletedTasksRecalculateSchedule(); + t.testAMRMTokenUpdate(); } }