MAPREDUCE-6230. Fixed RMContainerAllocator to update the new AMRMToken service name properly. Contributed by Jason Lowe
(cherry picked from commit cff05bff1f
)
This commit is contained in:
parent
e8300957a7
commit
43b3b43cea
|
@ -82,6 +82,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
MAPREDUCE-3283. mapred classpath CLI does not display the complete classpath
|
MAPREDUCE-3283. mapred classpath CLI does not display the complete classpath
|
||||||
(Varun Saxena via cnauroth)
|
(Varun Saxena via cnauroth)
|
||||||
|
|
||||||
|
MAPREDUCE-6230. Fixed RMContainerAllocator to update the new AMRMToken
|
||||||
|
service name properly. (Jason Lowe via jianhe)
|
||||||
|
|
||||||
Release 2.6.0 - 2014-11-18
|
Release 2.6.0 - 2014-11-18
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
|
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||||
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||||
|
@ -763,10 +764,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
.getIdentifier().array(), token.getPassword().array(), new Text(
|
.getIdentifier().array(), token.getPassword().array(), new Text(
|
||||||
token.getKind()), new Text(token.getService()));
|
token.getKind()), new Text(token.getService()));
|
||||||
UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
|
UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
|
||||||
currentUGI = UserGroupInformation.getLoginUser();
|
|
||||||
}
|
|
||||||
currentUGI.addToken(amrmToken);
|
currentUGI.addToken(amrmToken);
|
||||||
|
amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
|
@ -31,6 +31,7 @@ import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
@ -45,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||||
|
@ -74,7 +76,9 @@ import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
@ -109,6 +113,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||||
|
@ -2290,6 +2295,93 @@ public class TestRMContainerAllocator {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testAMRMTokenUpdate() throws Exception {
|
||||||
|
LOG.info("Running testAMRMTokenUpdate");
|
||||||
|
|
||||||
|
final String rmAddr = "somermaddress:1234";
|
||||||
|
final Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setLong(
|
||||||
|
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, 8);
|
||||||
|
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 2000);
|
||||||
|
conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, rmAddr);
|
||||||
|
|
||||||
|
final MyResourceManager rm = new MyResourceManager(conf);
|
||||||
|
rm.start();
|
||||||
|
AMRMTokenSecretManager secretMgr =
|
||||||
|
rm.getRMContext().getAMRMTokenSecretManager();
|
||||||
|
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
|
||||||
|
.getDispatcher();
|
||||||
|
|
||||||
|
// Submit the application
|
||||||
|
RMApp app = rm.submitApp(1024);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||||
|
amNodeManager.nodeHeartbeat(true);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
|
||||||
|
.getAppAttemptId();
|
||||||
|
final ApplicationId appId = app.getApplicationId();
|
||||||
|
rm.sendAMLaunched(appAttemptId);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
|
||||||
|
final Job mockJob = mock(Job.class);
|
||||||
|
when(mockJob.getReport()).thenReturn(
|
||||||
|
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
||||||
|
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
||||||
|
|
||||||
|
final Token<AMRMTokenIdentifier> oldToken = rm.getRMContext().getRMApps()
|
||||||
|
.get(appId).getRMAppAttempt(appAttemptId).getAMRMToken();
|
||||||
|
Assert.assertNotNull("app should have a token", oldToken);
|
||||||
|
UserGroupInformation testUgi = UserGroupInformation.createUserForTesting(
|
||||||
|
"someuser", new String[0]);
|
||||||
|
Token<AMRMTokenIdentifier> newToken = testUgi.doAs(
|
||||||
|
new PrivilegedExceptionAction<Token<AMRMTokenIdentifier>>() {
|
||||||
|
@Override
|
||||||
|
public Token<AMRMTokenIdentifier> run() throws Exception {
|
||||||
|
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
|
||||||
|
appAttemptId, mockJob);
|
||||||
|
|
||||||
|
// Keep heartbeating until RM thinks the token has been updated
|
||||||
|
Token<AMRMTokenIdentifier> currentToken = oldToken;
|
||||||
|
long startTime = Time.monotonicNow();
|
||||||
|
while (currentToken == oldToken) {
|
||||||
|
if (Time.monotonicNow() - startTime > 20000) {
|
||||||
|
Assert.fail("Took to long to see AMRM token change");
|
||||||
|
}
|
||||||
|
Thread.sleep(100);
|
||||||
|
allocator.schedule();
|
||||||
|
currentToken = rm.getRMContext().getRMApps().get(appId)
|
||||||
|
.getRMAppAttempt(appAttemptId).getAMRMToken();
|
||||||
|
}
|
||||||
|
|
||||||
|
return currentToken;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// verify there is only one AMRM token in the UGI and it matches the
|
||||||
|
// updated token from the RM
|
||||||
|
int tokenCount = 0;
|
||||||
|
Token<? extends TokenIdentifier> ugiToken = null;
|
||||||
|
for (Token<? extends TokenIdentifier> token : testUgi.getTokens()) {
|
||||||
|
if (AMRMTokenIdentifier.KIND_NAME.equals(token.getKind())) {
|
||||||
|
ugiToken = token;
|
||||||
|
++tokenCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals("too many AMRM tokens", 1, tokenCount);
|
||||||
|
Assert.assertArrayEquals("token identifier not updated",
|
||||||
|
newToken.getIdentifier(), ugiToken.getIdentifier());
|
||||||
|
Assert.assertArrayEquals("token password not updated",
|
||||||
|
newToken.getPassword(), ugiToken.getPassword());
|
||||||
|
Assert.assertEquals("AMRM token service not updated",
|
||||||
|
new Text(rmAddr), ugiToken.getService());
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
TestRMContainerAllocator t = new TestRMContainerAllocator();
|
TestRMContainerAllocator t = new TestRMContainerAllocator();
|
||||||
t.testSimple();
|
t.testSimple();
|
||||||
|
@ -2299,6 +2391,7 @@ public class TestRMContainerAllocator {
|
||||||
t.testReportedAppProgressWithOnlyMaps();
|
t.testReportedAppProgressWithOnlyMaps();
|
||||||
t.testBlackListedNodes();
|
t.testBlackListedNodes();
|
||||||
t.testCompletedTasksRecalculateSchedule();
|
t.testCompletedTasksRecalculateSchedule();
|
||||||
|
t.testAMRMTokenUpdate();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue