YARN-2212 ApplicationMaster needs to find a way to update the AMRMToken periodically. Contributed by Xuan Gong
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1616891 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0960e67787
commit
4365c4530b
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.mapreduce.v2.app.rm;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
|
@ -38,6 +39,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.JobCounter;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
|
@ -57,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdate
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.StringInterner;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
|
@ -68,9 +71,11 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.RackResolver;
|
||||
|
||||
|
@ -664,7 +669,12 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
nmToken.getToken());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Setting AMRMToken
|
||||
if (response.getAMRMToken() != null) {
|
||||
updateAMRMToken(response.getAMRMToken());
|
||||
}
|
||||
|
||||
List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
|
||||
if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) {
|
||||
//something changed
|
||||
|
@ -706,7 +716,19 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
}
|
||||
return newContainers;
|
||||
}
|
||||
|
||||
|
||||
private void updateAMRMToken(Token token) throws IOException {
|
||||
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
|
||||
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token
|
||||
.getIdentifier().array(), token.getPassword().array(), new Text(
|
||||
token.getKind()), new Text(token.getService()));
|
||||
UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
currentUGI = UserGroupInformation.getLoginUser();
|
||||
}
|
||||
currentUGI.addToken(amrmToken);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont,
|
||||
TaskAttemptId attemptID) {
|
||||
|
|
|
@ -79,6 +79,9 @@ Release 2.6.0 - UNRELEASED
|
|||
YARN-2352. FairScheduler: Collect metrics on duration of critical methods that
|
||||
affect performance. (kasha)
|
||||
|
||||
YARN-2212. ApplicationMaster needs to find a way to update the AMRMToken
|
||||
periodically. (xgong)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
|
|||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
|
@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.util.Records;
|
|||
* <li>A list of nodes whose status has been updated.</li>
|
||||
* <li>The number of available nodes in a cluster.</li>
|
||||
* <li>A description of resources requested back by the cluster</li>
|
||||
* <li>AMRMToken, if AMRMToken has been rolled over</li>
|
||||
* </ul>
|
||||
* </p>
|
||||
*
|
||||
|
@ -102,6 +104,23 @@ public abstract class AllocateResponse {
|
|||
return response;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public static AllocateResponse newInstance(int responseId,
|
||||
List<ContainerStatus> completedContainers,
|
||||
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
|
||||
Resource availResources, AMCommand command, int numClusterNodes,
|
||||
PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
|
||||
List<ContainerResourceIncrease> increasedContainers,
|
||||
List<ContainerResourceDecrease> decreasedContainers) {
|
||||
AllocateResponse response =
|
||||
newInstance(responseId, completedContainers, allocatedContainers,
|
||||
updatedNodes, availResources, command, numClusterNodes, preempt,
|
||||
nmTokens, increasedContainers, decreasedContainers);
|
||||
response.setAMRMToken(amRMToken);
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the <code>ResourceManager</code> needs the
|
||||
* <code>ApplicationMaster</code> to take some action then it will send an
|
||||
|
@ -270,4 +289,17 @@ public abstract class AllocateResponse {
|
|||
@Unstable
|
||||
public abstract void setDecreasedContainers(
|
||||
List<ContainerResourceDecrease> decreasedContainers);
|
||||
|
||||
/**
|
||||
* The AMRMToken that belong to this attempt
|
||||
*
|
||||
* @return The AMRMToken that belong to this attempt
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract Token getAMRMToken();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setAMRMToken(Token amRMToken);
|
||||
}
|
||||
|
|
|
@ -85,6 +85,7 @@ message AllocateResponseProto {
|
|||
repeated NMTokenProto nm_tokens = 9;
|
||||
repeated ContainerResourceIncreaseProto increased_containers = 10;
|
||||
repeated ContainerResourceDecreaseProto decreased_containers = 11;
|
||||
optional hadoop.common.TokenProto am_rm_token = 12;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////
|
||||
|
|
|
@ -39,7 +39,9 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
|
@ -56,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
|
@ -64,6 +67,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.util.RackResolver;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -300,6 +304,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
if (!allocateResponse.getNMTokens().isEmpty()) {
|
||||
populateNMTokens(allocateResponse.getNMTokens());
|
||||
}
|
||||
if (allocateResponse.getAMRMToken() != null) {
|
||||
updateAMRMToken(allocateResponse.getAMRMToken());
|
||||
}
|
||||
if (!pendingRelease.isEmpty()
|
||||
&& !allocateResponse.getCompletedContainersStatuses().isEmpty()) {
|
||||
removePendingReleaseRequests(allocateResponse
|
||||
|
@ -743,4 +750,16 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
"blacklistRemovals in updateBlacklist.");
|
||||
}
|
||||
}
|
||||
|
||||
private void updateAMRMToken(Token token) throws IOException {
|
||||
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
|
||||
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token
|
||||
.getIdentifier().array(), token.getPassword().array(), new Text(
|
||||
token.getKind()), new Text(token.getService()));
|
||||
UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
currentUGI = UserGroupInformation.getLoginUser();
|
||||
}
|
||||
currentUGI.addToken(amrmToken);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,19 +27,23 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
|
@ -71,9 +75,12 @@ import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
|||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.After;
|
||||
|
@ -93,6 +100,9 @@ public class TestAMRMClient {
|
|||
static ApplicationAttemptId attemptId = null;
|
||||
static int nodeCount = 3;
|
||||
|
||||
static final int rolling_interval_sec = 13;
|
||||
static final long am_expire_ms = 4000;
|
||||
|
||||
static Resource capability;
|
||||
static Priority priority;
|
||||
static Priority priority2;
|
||||
|
@ -106,6 +116,10 @@ public class TestAMRMClient {
|
|||
public static void setup() throws Exception {
|
||||
// start minicluster
|
||||
conf = new YarnConfiguration();
|
||||
conf.setLong(
|
||||
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
|
||||
rolling_interval_sec);
|
||||
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms);
|
||||
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
|
||||
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
|
||||
yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
|
||||
|
@ -809,4 +823,123 @@ public class TestAMRMClient {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testAMRMClientOnAMRMTokenRollOver() throws YarnException,
|
||||
IOException {
|
||||
AMRMClient<ContainerRequest> amClient = null;
|
||||
try {
|
||||
AMRMTokenSecretManager amrmTokenSecretManager =
|
||||
yarnCluster.getResourceManager().getRMContext()
|
||||
.getAMRMTokenSecretManager();
|
||||
|
||||
// start am rm client
|
||||
amClient = AMRMClient.<ContainerRequest> createAMRMClient();
|
||||
|
||||
amClient.init(conf);
|
||||
amClient.start();
|
||||
|
||||
Long startTime = System.currentTimeMillis();
|
||||
amClient.registerApplicationMaster("Host", 10000, "");
|
||||
|
||||
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken_1 =
|
||||
getAMRMToken();
|
||||
Assert.assertNotNull(amrmToken_1);
|
||||
Assert.assertEquals(amrmToken_1.decodeIdentifier().getKeyId(),
|
||||
amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId());
|
||||
|
||||
// Wait for enough time and make sure the roll_over happens
|
||||
// At mean time, the old AMRMToken should continue to work
|
||||
while (System.currentTimeMillis() - startTime <
|
||||
rolling_interval_sec * 1000) {
|
||||
amClient.allocate(0.1f);
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
amClient.allocate(0.1f);
|
||||
|
||||
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken_2 =
|
||||
getAMRMToken();
|
||||
Assert.assertNotNull(amrmToken_2);
|
||||
Assert.assertEquals(amrmToken_2.decodeIdentifier().getKeyId(),
|
||||
amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId());
|
||||
|
||||
Assert.assertNotEquals(amrmToken_1, amrmToken_2);
|
||||
|
||||
// can do the allocate call with latest AMRMToken
|
||||
amClient.allocate(0.1f);
|
||||
|
||||
// Make sure previous token has been rolled-over
|
||||
// and can not use this rolled-over token to make a allocate all.
|
||||
while (true) {
|
||||
if (amrmToken_2.decodeIdentifier().getKeyId() != amrmTokenSecretManager
|
||||
.getCurrnetMasterKeyData().getMasterKey().getKeyId()) {
|
||||
if (amrmTokenSecretManager.getNextMasterKeyData() == null) {
|
||||
break;
|
||||
} else if (amrmToken_2.decodeIdentifier().getKeyId() !=
|
||||
amrmTokenSecretManager.getNextMasterKeyData().getMasterKey()
|
||||
.getKeyId()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
amClient.allocate(0.1f);
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
// DO NOTHING
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
UserGroupInformation testUser =
|
||||
UserGroupInformation.createRemoteUser("testUser");
|
||||
SecurityUtil.setTokenService(amrmToken_2, yarnCluster
|
||||
.getResourceManager().getApplicationMasterService().getBindAddress());
|
||||
testUser.addToken(amrmToken_2);
|
||||
testUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
|
||||
@Override
|
||||
public ApplicationMasterProtocol run() {
|
||||
return (ApplicationMasterProtocol) YarnRPC.create(conf).getProxy(
|
||||
ApplicationMasterProtocol.class,
|
||||
yarnCluster.getResourceManager().getApplicationMasterService()
|
||||
.getBindAddress(), conf);
|
||||
}
|
||||
}).allocate(Records.newRecord(AllocateRequest.class));
|
||||
Assert.fail("The old Token should not work");
|
||||
} catch (Exception ex) {
|
||||
Assert.assertTrue(ex instanceof InvalidToken);
|
||||
Assert.assertTrue(ex.getMessage().contains(
|
||||
"Invalid AMRMToken from "
|
||||
+ amrmToken_2.decodeIdentifier().getApplicationAttemptId()));
|
||||
}
|
||||
|
||||
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
|
||||
null, null);
|
||||
|
||||
} finally {
|
||||
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
|
||||
amClient.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>
|
||||
getAMRMToken() throws IOException {
|
||||
Credentials credentials =
|
||||
UserGroupInformation.getCurrentUser().getCredentials();
|
||||
Iterator<org.apache.hadoop.security.token.Token<?>> iter =
|
||||
credentials.getAllTokens().iterator();
|
||||
while (iter.hasNext()) {
|
||||
org.apache.hadoop.security.token.Token<?> token = iter.next();
|
||||
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
|
||||
return (org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>)
|
||||
token;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.client.api.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
|
@ -26,7 +27,11 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
|
@ -42,10 +47,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
|
@ -58,12 +65,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestAMRMClientOnRMRestart {
|
||||
static Configuration conf = null;
|
||||
static final int rolling_interval_sec = 13;
|
||||
static final long am_expire_ms = 4000;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
|
@ -362,6 +373,134 @@ public class TestAMRMClientOnRMRestart {
|
|||
|
||||
}
|
||||
|
||||
|
||||
// Test verify for AM issued with rolled-over AMRMToken
|
||||
// is still able to communicate with restarted RM.
|
||||
@Test(timeout = 30000)
|
||||
public void testAMRMClientOnAMRMTokenRollOverOnRMRestart() throws Exception {
|
||||
conf.setLong(
|
||||
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
|
||||
rolling_interval_sec);
|
||||
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
// start first RM
|
||||
MyResourceManager2 rm1 = new MyResourceManager2(conf, memStore);
|
||||
rm1.start();
|
||||
DrainDispatcher dispatcher =
|
||||
(DrainDispatcher) rm1.getRMContext().getDispatcher();
|
||||
Long startTime = System.currentTimeMillis();
|
||||
// Submit the application
|
||||
RMApp app = rm1.submitApp(1024);
|
||||
dispatcher.await();
|
||||
|
||||
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
nm1.nodeHeartbeat(true); // Node heartbeat
|
||||
dispatcher.await();
|
||||
|
||||
ApplicationAttemptId appAttemptId =
|
||||
app.getCurrentAppAttempt().getAppAttemptId();
|
||||
rm1.sendAMLaunched(appAttemptId);
|
||||
dispatcher.await();
|
||||
|
||||
AMRMTokenSecretManager amrmTokenSecretManagerForRM1 =
|
||||
rm1.getRMContext().getAMRMTokenSecretManager();
|
||||
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
|
||||
amrmTokenSecretManagerForRM1.createAndGetAMRMToken(appAttemptId);
|
||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
ugi.addTokenIdentifier(token.decodeIdentifier());
|
||||
|
||||
AMRMClient<ContainerRequest> amClient = new MyAMRMClientImpl(rm1);
|
||||
amClient.init(conf);
|
||||
amClient.start();
|
||||
|
||||
amClient.registerApplicationMaster("h1", 10000, "");
|
||||
amClient.allocate(0.1f);
|
||||
|
||||
// Wait for enough time and make sure the roll_over happens
|
||||
// At mean time, the old AMRMToken should continue to work
|
||||
while (System.currentTimeMillis() - startTime < rolling_interval_sec * 1000) {
|
||||
amClient.allocate(0.1f);
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
// DO NOTHING
|
||||
}
|
||||
}
|
||||
Assert.assertTrue(amrmTokenSecretManagerForRM1.getMasterKey()
|
||||
.getMasterKey().getKeyId() != token.decodeIdentifier().getKeyId());
|
||||
|
||||
amClient.allocate(0.1f);
|
||||
|
||||
// active the nextMasterKey, and replace the currentMasterKey
|
||||
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newToken =
|
||||
amrmTokenSecretManagerForRM1.createAndGetAMRMToken(appAttemptId);
|
||||
int waitCount = 0;
|
||||
while (waitCount++ <= 50) {
|
||||
if (amrmTokenSecretManagerForRM1.getCurrnetMasterKeyData().getMasterKey()
|
||||
.getKeyId() != token.decodeIdentifier().getKeyId()) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
amClient.allocate(0.1f);
|
||||
} catch (Exception ex) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(500);
|
||||
}
|
||||
Assert
|
||||
.assertTrue(amrmTokenSecretManagerForRM1.getNextMasterKeyData() == null);
|
||||
Assert.assertTrue(amrmTokenSecretManagerForRM1.getCurrnetMasterKeyData()
|
||||
.getMasterKey().getKeyId() == newToken.decodeIdentifier().getKeyId());
|
||||
|
||||
// start 2nd RM
|
||||
conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:9030");
|
||||
final MyResourceManager2 rm2 = new MyResourceManager2(conf, memStore);
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
|
||||
dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
|
||||
|
||||
AMRMTokenSecretManager amrmTokenSecretManagerForRM2 =
|
||||
rm2.getRMContext().getAMRMTokenSecretManager();
|
||||
Assert.assertTrue(amrmTokenSecretManagerForRM2.getCurrnetMasterKeyData()
|
||||
.getMasterKey().getKeyId() == newToken.decodeIdentifier().getKeyId());
|
||||
Assert
|
||||
.assertTrue(amrmTokenSecretManagerForRM2.getNextMasterKeyData() == null);
|
||||
|
||||
try {
|
||||
UserGroupInformation testUser =
|
||||
UserGroupInformation.createRemoteUser("testUser");
|
||||
SecurityUtil.setTokenService(token, rm2.getApplicationMasterService()
|
||||
.getBindAddress());
|
||||
testUser.addToken(token);
|
||||
testUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
|
||||
@Override
|
||||
public ApplicationMasterProtocol run() {
|
||||
return (ApplicationMasterProtocol) YarnRPC.create(conf).getProxy(
|
||||
ApplicationMasterProtocol.class,
|
||||
rm2.getApplicationMasterService().getBindAddress(), conf);
|
||||
}
|
||||
}).allocate(Records.newRecord(AllocateRequest.class));
|
||||
Assert.fail("The old Token should not work");
|
||||
} catch (Exception ex) {
|
||||
Assert.assertTrue(ex instanceof InvalidToken);
|
||||
Assert.assertTrue(ex.getMessage().contains(
|
||||
"Invalid AMRMToken from "
|
||||
+ token.decodeIdentifier().getApplicationAttemptId()));
|
||||
}
|
||||
|
||||
// make sure the recovered AMRMToken works for new RM
|
||||
amClient.allocate(0.1f);
|
||||
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
|
||||
null, null);
|
||||
amClient.stop();
|
||||
rm1.stop();
|
||||
rm2.stop();
|
||||
}
|
||||
|
||||
private static class MyFifoScheduler extends FifoScheduler {
|
||||
|
||||
public MyFifoScheduler(RMContext rmContext) {
|
||||
|
@ -445,6 +584,18 @@ public class TestAMRMClientOnRMRestart {
|
|||
}
|
||||
}
|
||||
|
||||
private static class MyResourceManager2 extends MyResourceManager {
|
||||
|
||||
public MyResourceManager2(Configuration conf, RMStateStore store) {
|
||||
super(conf, store);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ApplicationMasterService createApplicationMasterService() {
|
||||
return new ApplicationMasterService(getRMContext(), scheduler);
|
||||
}
|
||||
}
|
||||
|
||||
private static class MyAMRMClientImpl extends
|
||||
AMRMClientImpl<ContainerRequest> {
|
||||
private MyResourceManager rm;
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
|
@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
|
|||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceDecreasePBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceIncreasePBImpl;
|
||||
|
@ -44,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
|
|||
import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceDecreaseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceIncreaseProto;
|
||||
|
@ -74,7 +77,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|||
|
||||
private List<NodeReport> updatedNodes = null;
|
||||
private PreemptionMessage preempt;
|
||||
|
||||
private Token amrmToken = null;
|
||||
|
||||
public AllocateResponsePBImpl() {
|
||||
builder = AllocateResponseProto.newBuilder();
|
||||
|
@ -154,6 +157,9 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|||
getChangeProtoIterable(this.decreasedContainers);
|
||||
builder.addAllDecreasedContainers(iterable);
|
||||
}
|
||||
if (this.amrmToken != null) {
|
||||
builder.setAmRmToken(convertToProtoFormat(this.amrmToken));
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void mergeLocalToProto() {
|
||||
|
@ -357,6 +363,28 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|||
this.decreasedContainers.addAll(decreasedContainers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Token getAMRMToken() {
|
||||
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (amrmToken != null) {
|
||||
return amrmToken;
|
||||
}
|
||||
if (!p.hasAmRmToken()) {
|
||||
return null;
|
||||
}
|
||||
this.amrmToken = convertFromProtoFormat(p.getAmRmToken());
|
||||
return amrmToken;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setAMRMToken(Token amRMToken) {
|
||||
maybeInitBuilder();
|
||||
if (amRMToken == null) {
|
||||
builder.clearAmRmToken();
|
||||
}
|
||||
this.amrmToken = amRMToken;
|
||||
}
|
||||
|
||||
private synchronized void initLocalIncreasedContainerList() {
|
||||
if (this.increasedContainers != null) {
|
||||
return;
|
||||
|
@ -699,4 +727,12 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|||
private synchronized NMToken convertFromProtoFormat(NMTokenProto proto) {
|
||||
return new NMTokenPBImpl(proto);
|
||||
}
|
||||
|
||||
private TokenPBImpl convertFromProtoFormat(TokenProto p) {
|
||||
return new TokenPBImpl(p);
|
||||
}
|
||||
|
||||
private TokenProto convertToProtoFormat(Token t) {
|
||||
return ((TokenPBImpl)t).getProto();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.ipc.Server;
|
|||
import org.apache.hadoop.security.SaslRpcServer;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
@ -80,6 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstant
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
||||
|
@ -89,6 +91,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -189,7 +192,7 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
return result;
|
||||
}
|
||||
|
||||
private ApplicationAttemptId authorizeRequest()
|
||||
private AMRMTokenIdentifier authorizeRequest()
|
||||
throws YarnException {
|
||||
|
||||
UserGroupInformation remoteUgi;
|
||||
|
@ -226,7 +229,7 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
throw RPCUtil.getRemoteException(message);
|
||||
}
|
||||
|
||||
return appTokenIdentifier.getApplicationAttemptId();
|
||||
return appTokenIdentifier;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -234,7 +237,9 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
RegisterApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
|
||||
ApplicationAttemptId applicationAttemptId = authorizeRequest();
|
||||
AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
|
||||
ApplicationAttemptId applicationAttemptId =
|
||||
amrmTokenIdentifier.getApplicationAttemptId();
|
||||
|
||||
ApplicationId appID = applicationAttemptId.getApplicationId();
|
||||
AllocateResponseLock lock = responseMap.get(applicationAttemptId);
|
||||
|
@ -333,7 +338,8 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
FinishApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
|
||||
ApplicationAttemptId applicationAttemptId = authorizeRequest();
|
||||
ApplicationAttemptId applicationAttemptId =
|
||||
authorizeRequest().getApplicationAttemptId();
|
||||
|
||||
AllocateResponseLock lock = responseMap.get(applicationAttemptId);
|
||||
if (lock == null) {
|
||||
|
@ -408,7 +414,10 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
public AllocateResponse allocate(AllocateRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
ApplicationAttemptId appAttemptId = authorizeRequest();
|
||||
AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
|
||||
|
||||
ApplicationAttemptId appAttemptId =
|
||||
amrmTokenIdentifier.getApplicationAttemptId();
|
||||
|
||||
this.amLivelinessMonitor.receivedPing(appAttemptId);
|
||||
|
||||
|
@ -557,6 +566,23 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
allocateResponse
|
||||
.setPreemptionMessage(generatePreemptionMessage(allocation));
|
||||
|
||||
// update AMRMToken if the token is rolled-up
|
||||
MasterKeyData nextMasterKey =
|
||||
this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();
|
||||
|
||||
if (nextMasterKey != null
|
||||
&& nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
|
||||
.getKeyId()) {
|
||||
Token<AMRMTokenIdentifier> amrmToken =
|
||||
rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
|
||||
appAttemptId);
|
||||
((RMAppAttemptImpl)appAttempt).setAMRMToken(amrmToken);
|
||||
allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
|
||||
.newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
|
||||
.toString(), amrmToken.getPassword(), amrmToken.getService()
|
||||
.toString()));
|
||||
}
|
||||
|
||||
/*
|
||||
* As we are updating the response inside the lock object so we don't
|
||||
* need to worry about unregister call occurring in between (which
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
||||
|
@ -226,7 +227,7 @@ public class AMLauncher implements Runnable {
|
|||
}
|
||||
|
||||
// Add AMRMToken
|
||||
Token<AMRMTokenIdentifier> amrmToken = getAMRMToken();
|
||||
Token<AMRMTokenIdentifier> amrmToken = createAndSetAMRMToken();
|
||||
if (amrmToken != null) {
|
||||
credentials.addToken(amrmToken.getService(), amrmToken);
|
||||
}
|
||||
|
@ -236,8 +237,12 @@ public class AMLauncher implements Runnable {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected Token<AMRMTokenIdentifier> getAMRMToken() {
|
||||
return application.getAMRMToken();
|
||||
protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() {
|
||||
Token<AMRMTokenIdentifier> amrmToken =
|
||||
this.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
|
||||
application.getAppAttemptId());
|
||||
((RMAppAttemptImpl)application).setAMRMToken(amrmToken);
|
||||
return amrmToken;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
|
@ -71,6 +71,10 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
* FileSystem interface. Does not use directories so that simple key-value
|
||||
* stores can be used. The retry policy for the real filesystem client must be
|
||||
* configured separately to enable retry of filesystem operations when needed.
|
||||
*
|
||||
* Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved
|
||||
* separately. The currentMasterkey and nextMasterkey have been stored.
|
||||
* Also, AMRMToken has been removed from ApplicationAttemptState.
|
||||
*/
|
||||
public class FileSystemRMStateStore extends RMStateStore {
|
||||
|
||||
|
@ -78,7 +82,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|||
|
||||
protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
|
||||
protected static final Version CURRENT_VERSION_INFO = Version
|
||||
.newInstance(1, 1);
|
||||
.newInstance(1, 2);
|
||||
protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
|
||||
"AMRMTokenSecretManagerNode";
|
||||
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
@ -45,7 +44,6 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPB
|
|||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.records.Version;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
|
||||
|
@ -769,10 +767,7 @@ public abstract class RMStateStore extends AbstractService {
|
|||
|
||||
public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) {
|
||||
Credentials credentials = new Credentials();
|
||||
Token<AMRMTokenIdentifier> appToken = appAttempt.getAMRMToken();
|
||||
if(appToken != null){
|
||||
credentials.addToken(AM_RM_TOKEN_SERVICE, appToken);
|
||||
}
|
||||
|
||||
SecretKey clientTokenMasterKey =
|
||||
appAttempt.getClientTokenMasterKey();
|
||||
if(clientTokenMasterKey != null){
|
||||
|
|
|
@ -78,6 +78,11 @@ import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved
|
||||
* separately. The currentMasterkey and nextMasterkey have been stored.
|
||||
* Also, AMRMToken has been removed from ApplicationAttemptState.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public class ZKRMStateStore extends RMStateStore {
|
||||
|
@ -87,7 +92,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|||
|
||||
protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
|
||||
protected static final Version CURRENT_VERSION_INFO = Version
|
||||
.newInstance(1, 1);
|
||||
.newInstance(1, 2);
|
||||
private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
|
||||
"RMDelegationTokensRoot";
|
||||
private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.commons.lang.StringUtils;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -559,7 +560,22 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
|
||||
@Override
|
||||
public Token<AMRMTokenIdentifier> getAMRMToken() {
|
||||
return this.amrmToken;
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return this.amrmToken;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Private
|
||||
public void setAMRMToken(Token<AMRMTokenIdentifier> lastToken) {
|
||||
this.writeLock.lock();
|
||||
try {
|
||||
this.amrmToken = lastToken;
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -713,7 +729,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
this.attemptMetrics.setIsPreempted();
|
||||
}
|
||||
setMasterContainer(attemptState.getMasterContainer());
|
||||
recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials());
|
||||
recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(),
|
||||
attemptState.getState());
|
||||
this.recoveredFinalState = attemptState.getState();
|
||||
this.originalTrackingUrl = attemptState.getFinalTrackingUrl();
|
||||
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
|
||||
|
@ -725,9 +742,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
this.justFinishedContainers = attempt.getJustFinishedContainers();
|
||||
}
|
||||
|
||||
private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
|
||||
throws IOException {
|
||||
if (appAttemptTokens == null) {
|
||||
private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
|
||||
RMAppAttemptState state) throws IOException {
|
||||
if (appAttemptTokens == null || state == RMAppAttemptState.FAILED
|
||||
|| state == RMAppAttemptState.FINISHED
|
||||
|| state == RMAppAttemptState.KILLED) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -738,12 +757,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
.registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes);
|
||||
}
|
||||
|
||||
// Only one AMRMToken is stored per-attempt, so this should be fine. Can't
|
||||
// use TokenSelector as service may change - think fail-over.
|
||||
this.amrmToken =
|
||||
(Token<AMRMTokenIdentifier>) appAttemptTokens
|
||||
.getToken(RMStateStore.AM_RM_TOKEN_SERVICE);
|
||||
rmContext.getAMRMTokenSecretManager().addPersistedPassword(this.amrmToken);
|
||||
rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
|
||||
applicationAttemptId);
|
||||
}
|
||||
|
||||
private static class BaseTransition implements
|
||||
|
@ -779,11 +795,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
.createMasterKey(appAttempt.applicationAttemptId);
|
||||
}
|
||||
|
||||
// create AMRMToken
|
||||
appAttempt.amrmToken =
|
||||
appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
|
||||
appAttempt.applicationAttemptId);
|
||||
|
||||
// Add the applicationAttempt to the scheduler and inform the scheduler
|
||||
// whether to transfer the state from previous attempt.
|
||||
appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
|
||||
|
@ -896,6 +907,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
public void transition(RMAppAttemptImpl appAttempt,
|
||||
RMAppAttemptEvent event) {
|
||||
appAttempt.checkAttemptStoreError(event);
|
||||
|
||||
appAttempt.launchAttempt();
|
||||
}
|
||||
}
|
||||
|
@ -1185,11 +1197,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
public void transition(RMAppAttemptImpl appAttempt,
|
||||
RMAppAttemptEvent event) {
|
||||
appAttempt.checkAttemptStoreError(event);
|
||||
// TODO Today unmanaged AM client is waiting for app state to be Accepted to
|
||||
// launch the AM. This is broken since we changed to start the attempt
|
||||
// after the application is Accepted. We may need to introduce an attempt
|
||||
// report that client can rely on to query the attempt state and choose to
|
||||
// launch the unmanaged AM.
|
||||
|
||||
// create AMRMToken
|
||||
appAttempt.amrmToken =
|
||||
appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
|
||||
appAttempt.applicationAttemptId);
|
||||
|
||||
super.transition(appAttempt, event);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -167,6 +167,11 @@ public class AMRMTokenSecretManager extends
|
|||
+ this.nextMasterKey.getMasterKey().getKeyId());
|
||||
this.currentMasterKey = this.nextMasterKey;
|
||||
this.nextMasterKey = null;
|
||||
AMRMTokenSecretManagerState state =
|
||||
AMRMTokenSecretManagerState.newInstance(
|
||||
this.currentMasterKey.getMasterKey(), null);
|
||||
rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state,
|
||||
true);
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
|
@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
||||
|
@ -62,6 +64,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
|
@ -412,6 +415,13 @@ public class MockRM extends ResourceManager {
|
|||
throws Exception {
|
||||
MockAM am = new MockAM(getRMContext(), masterService, appAttemptId);
|
||||
am.waitForState(RMAppAttemptState.ALLOCATED);
|
||||
//create and set AMRMToken
|
||||
Token<AMRMTokenIdentifier> amrmToken =
|
||||
this.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
|
||||
appAttemptId);
|
||||
((RMAppAttemptImpl) this.rmContext.getRMApps()
|
||||
.get(appAttemptId.getApplicationId()).getRMAppAttempt(appAttemptId))
|
||||
.setAMRMToken(amrmToken);
|
||||
getRMContext()
|
||||
.getDispatcher()
|
||||
.getEventHandler()
|
||||
|
|
|
@ -59,8 +59,9 @@ public class MockRMWithCustomAMLauncher extends MockRM {
|
|||
return containerManager;
|
||||
}
|
||||
@Override
|
||||
protected Token<AMRMTokenIdentifier> getAMRMToken() {
|
||||
Token<AMRMTokenIdentifier> amRmToken = super.getAMRMToken();
|
||||
protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() {
|
||||
Token<AMRMTokenIdentifier> amRmToken =
|
||||
super.createAndSetAMRMToken();
|
||||
InetSocketAddress serviceAddr =
|
||||
getConfig().getSocketAddr(
|
||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
|
|
|
@ -1208,18 +1208,13 @@ public class TestRMRestart {
|
|||
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
|
||||
attemptState.getMasterContainer().getId());
|
||||
|
||||
// the appToken and clientTokenMasterKey that are generated when
|
||||
// the clientTokenMasterKey that are generated when
|
||||
// RMAppAttempt is created,
|
||||
HashSet<Token<?>> tokenSet = new HashSet<Token<?>>();
|
||||
tokenSet.add(attempt1.getAMRMToken());
|
||||
byte[] clientTokenMasterKey =
|
||||
attempt1.getClientTokenMasterKey().getEncoded();
|
||||
|
||||
// assert application credentials are saved
|
||||
Credentials savedCredentials = attemptState.getAppAttemptCredentials();
|
||||
HashSet<Token<?>> savedTokens = new HashSet<Token<?>>();
|
||||
savedTokens.addAll(savedCredentials.getAllTokens());
|
||||
Assert.assertEquals(tokenSet, savedTokens);
|
||||
Assert.assertArrayEquals("client token master key not saved",
|
||||
clientTokenMasterKey, savedCredentials.getSecretKey(
|
||||
RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
|
||||
|
@ -1232,11 +1227,8 @@ public class TestRMRestart {
|
|||
rm2.getRMContext().getRMApps().get(app1.getApplicationId());
|
||||
RMAppAttempt loadedAttempt1 = loadedApp1.getRMAppAttempt(attemptId1);
|
||||
|
||||
// assert loaded attempt recovered attempt tokens
|
||||
// assert loaded attempt recovered
|
||||
Assert.assertNotNull(loadedAttempt1);
|
||||
savedTokens.clear();
|
||||
savedTokens.add(loadedAttempt1.getAMRMToken());
|
||||
Assert.assertEquals(tokenSet, savedTokens);
|
||||
|
||||
// assert client token master key is recovered back to api-versioned
|
||||
// client token master key
|
||||
|
|
|
@ -198,8 +198,6 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
|||
// create application token and client token key for attempt1
|
||||
Token<AMRMTokenIdentifier> appAttemptToken1 =
|
||||
generateAMRMToken(attemptId1, appTokenMgr);
|
||||
HashSet<Token<?>> attemptTokenSet1 = new HashSet<Token<?>>();
|
||||
attemptTokenSet1.add(appAttemptToken1);
|
||||
SecretKey clientTokenKey1 =
|
||||
clientToAMTokenMgr.createMasterKey(attemptId1);
|
||||
|
||||
|
@ -214,8 +212,6 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
|||
// create application token and client token key for attempt2
|
||||
Token<AMRMTokenIdentifier> appAttemptToken2 =
|
||||
generateAMRMToken(attemptId2, appTokenMgr);
|
||||
HashSet<Token<?>> attemptTokenSet2 = new HashSet<Token<?>>();
|
||||
attemptTokenSet2.add(appAttemptToken2);
|
||||
SecretKey clientTokenKey2 =
|
||||
clientToAMTokenMgr.createMasterKey(attemptId2);
|
||||
|
||||
|
@ -280,10 +276,6 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
|||
assertEquals(-1000, attemptState.getAMContainerExitStatus());
|
||||
// attempt1 container is loaded correctly
|
||||
assertEquals(containerId1, attemptState.getMasterContainer().getId());
|
||||
// attempt1 applicationToken is loaded correctly
|
||||
HashSet<Token<?>> savedTokens = new HashSet<Token<?>>();
|
||||
savedTokens.addAll(attemptState.getAppAttemptCredentials().getAllTokens());
|
||||
assertEquals(attemptTokenSet1, savedTokens);
|
||||
// attempt1 client token master key is loaded correctly
|
||||
assertArrayEquals(clientTokenKey1.getEncoded(),
|
||||
attemptState.getAppAttemptCredentials()
|
||||
|
@ -295,10 +287,6 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
|||
assertEquals(attemptId2, attemptState.getAttemptId());
|
||||
// attempt2 container is loaded correctly
|
||||
assertEquals(containerId2, attemptState.getMasterContainer().getId());
|
||||
// attempt2 applicationToken is loaded correctly
|
||||
savedTokens.clear();
|
||||
savedTokens.addAll(attemptState.getAppAttemptCredentials().getAllTokens());
|
||||
assertEquals(attemptTokenSet2, savedTokens);
|
||||
// attempt2 client token master key is loaded correctly
|
||||
assertArrayEquals(clientTokenKey2.getEncoded(),
|
||||
attemptState.getAppAttemptCredentials()
|
||||
|
|
|
@ -349,7 +349,6 @@ public class TestRMAppAttemptTransitions {
|
|||
assertNull(applicationAttempt.createClientToken("some client"));
|
||||
}
|
||||
assertNull(applicationAttempt.createClientToken(null));
|
||||
assertNotNull(applicationAttempt.getAMRMToken());
|
||||
// Check events
|
||||
verify(masterService).
|
||||
registerAppAttempt(applicationAttempt.getAppAttemptId());
|
||||
|
@ -445,7 +444,6 @@ public class TestRMAppAttemptTransitions {
|
|||
assertEquals(RMAppAttemptState.ALLOCATED,
|
||||
applicationAttempt.getAppAttemptState());
|
||||
assertEquals(amContainer, applicationAttempt.getMasterContainer());
|
||||
|
||||
// Check events
|
||||
verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class));
|
||||
verify(scheduler, times(2)).
|
||||
|
|
|
@ -18,15 +18,16 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -34,6 +35,7 @@ import org.apache.hadoop.security.token.Token;
|
|||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
|
||||
|
@ -53,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -328,6 +332,51 @@ public class TestAMRMTokens {
|
|||
}
|
||||
}
|
||||
|
||||
@Test (timeout = 20000)
|
||||
public void testAMRMMasterKeysUpdate() throws Exception {
|
||||
MockRM rm = new MockRM(conf) {
|
||||
@Override
|
||||
protected void doSecureLogin() throws IOException {
|
||||
// Skip the login.
|
||||
}
|
||||
};
|
||||
rm.start();
|
||||
MockNM nm = rm.registerNode("127.0.0.1:1234", 8000);
|
||||
RMApp app = rm.submitApp(200);
|
||||
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm);
|
||||
|
||||
// Do allocate. Should not update AMRMToken
|
||||
AllocateResponse response =
|
||||
am.allocate(Records.newRecord(AllocateRequest.class));
|
||||
Assert.assertNull(response.getAMRMToken());
|
||||
|
||||
// roll over the master key
|
||||
// Do allocate again. the AM should get the latest AMRMToken
|
||||
rm.getRMContext().getAMRMTokenSecretManager().rollMasterKey();
|
||||
response = am.allocate(Records.newRecord(AllocateRequest.class));
|
||||
Assert.assertNotNull(response.getAMRMToken());
|
||||
|
||||
Token<AMRMTokenIdentifier> amrmToken =
|
||||
ConverterUtils.convertFromYarn(response.getAMRMToken(), new Text(
|
||||
response.getAMRMToken().getService()));
|
||||
|
||||
Assert.assertEquals(amrmToken.decodeIdentifier().getKeyId(), rm
|
||||
.getRMContext().getAMRMTokenSecretManager().getMasterKey().getMasterKey()
|
||||
.getKeyId());
|
||||
|
||||
// Do allocate again. The master key does not update.
|
||||
// AM should not update its AMRMToken either
|
||||
response = am.allocate(Records.newRecord(AllocateRequest.class));
|
||||
Assert.assertNull(response.getAMRMToken());
|
||||
|
||||
// Activate the next master key. Since there is new master key generated
|
||||
// in AMRMTokenSecretManager. The AMRMToken will not get updated for AM
|
||||
rm.getRMContext().getAMRMTokenSecretManager().activateNextMasterKey();
|
||||
response = am.allocate(Records.newRecord(AllocateRequest.class));
|
||||
Assert.assertNull(response.getAMRMToken());
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
private ApplicationMasterProtocol createRMClient(final MockRM rm,
|
||||
final Configuration conf, final YarnRPC rpc,
|
||||
UserGroupInformation currentUser) {
|
||||
|
|
Loading…
Reference in New Issue