YARN-2704. Changed ResourceManager to optionally obtain tokens itself for the sake of localization and log-aggregation for long-running services. Contributed by Jian He.

(cherry picked from commit a16d022ca4313a41425c8e97841c841a2d6f2f54)
This commit is contained in:
Vinod Kumar Vavilapalli 2014-10-27 15:49:47 -07:00
parent e8d77593fa
commit 0af8cc9ca2
24 changed files with 701 additions and 124 deletions

View File

@ -334,6 +334,10 @@ Release 2.6.0 - UNRELEASED
YARN-2703. Added logUploadedTime into LogValue for better display. (Xuan Gong
via zjshen)
YARN-2704. Changed ResourceManager to optionally obtain tokens itself for the
sake of localization and log-aggregation for long-running services. (Jian He
via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -695,6 +695,10 @@ private static void addDeprecatedKeys() {
RM_PREFIX + "delegation-token-renewer.thread-count";
public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 50;
public static final String RM_PROXY_USER_PRIVILEGES_ENABLED = RM_PREFIX
+ "proxy-user-privileges.enabled";
public static boolean DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED = false;
/** Whether to enable log aggregation */
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
+ "log-aggregation-enable";

View File

@ -553,6 +553,21 @@
<value>30000</value>
</property>
<property>
<description>If true, ResourceManager will have proxy-user privileges.
Use case: In a secure cluster, YARN requires the user hdfs delegation-tokens to
do localization and log-aggregation on behalf of the user. If this is set to true,
ResourceManager is able to request new hdfs delegation tokens on behalf of
the user. This is needed by long-running-service, because the hdfs tokens
will eventually expire and YARN requires new valid tokens to do localization
and log-aggregation. Note that to enable this use case, the corresponding
HDFS NameNode has to configure ResourceManager as the proxy-user so that
ResourceManager can itself ask for new tokens on behalf of the user when
tokens are past their max-life-time.</description>
<name>yarn.resourcemanager.proxy-user-privileges.enabled</name>
<value>false</value>
</property>
<property>
<description>Interval for the roll over for the master key used to generate
application tokens

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -58,4 +60,11 @@ public interface NodeHeartbeatResponse {
String getDiagnosticsMessage();
void setDiagnosticsMessage(String diagnosticsMessage);
// Credentials (i.e. hdfs tokens) needed by NodeManagers for application
// localizations and logAggreations.
Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps();
void setSystemCredentialsForApps(
Map<ApplicationId, ByteBuffer> systemCredentials);
}

View File

@ -18,21 +18,26 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
@ -49,6 +54,8 @@ public class NodeHeartbeatResponsePBImpl extends
private List<ContainerId> containersToCleanup = null;
private List<ContainerId> containersToBeRemovedFromNM = null;
private List<ApplicationId> applicationsToCleanup = null;
private Map<ApplicationId, ByteBuffer> systemCredentials = null;
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
@ -62,7 +69,7 @@ public NodeHeartbeatResponsePBImpl(NodeHeartbeatResponseProto proto) {
}
public NodeHeartbeatResponseProto getProto() {
mergeLocalToProto();
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
@ -86,6 +93,19 @@ private void mergeLocalToBuilder() {
builder.setNmTokenMasterKey(
convertToProtoFormat(this.nmTokenMasterKey));
}
if (this.systemCredentials != null) {
addSystemCredentialsToProto();
}
}
private void addSystemCredentialsToProto() {
maybeInitBuilder();
builder.clearSystemCredentialsForApps();
for (Map.Entry<ApplicationId, ByteBuffer> entry : systemCredentials.entrySet()) {
builder.addSystemCredentialsForApps(SystemCredentialsForAppsProto.newBuilder()
.setAppId(convertToProtoFormat(entry.getKey()))
.setCredentialsForApp(ProtoUtils.convertToProtoFormat(entry.getValue())));
}
}
private void mergeLocalToProto() {
@ -387,6 +407,38 @@ public void remove() {
builder.addAllApplicationsToCleanup(iterable);
}
@Override
public Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
if (this.systemCredentials != null) {
return this.systemCredentials;
}
initSystemCredentials();
return systemCredentials;
}
private void initSystemCredentials() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
List<SystemCredentialsForAppsProto> list = p.getSystemCredentialsForAppsList();
this.systemCredentials = new HashMap<ApplicationId, ByteBuffer> ();
for (SystemCredentialsForAppsProto c : list) {
ApplicationId appId = convertFromProtoFormat(c.getAppId());
ByteBuffer byteBuffer = ProtoUtils.convertFromProtoFormat(c.getCredentialsForApp());
this.systemCredentials.put(appId, byteBuffer);
}
}
@Override
public void setSystemCredentialsForApps(
Map<ApplicationId, ByteBuffer> systemCredentials) {
if (systemCredentials == null || systemCredentials.isEmpty()) {
return;
}
maybeInitBuilder();
this.systemCredentials = new HashMap<ApplicationId, ByteBuffer>();
this.systemCredentials.putAll(systemCredentials);
}
@Override
public long getNextHeartBeatInterval() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;

View File

@ -59,6 +59,12 @@ message NodeHeartbeatResponseProto {
optional int64 nextHeartBeatInterval = 7;
optional string diagnostics_message = 8;
repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
}
message SystemCredentialsForAppsProto {
optional ApplicationIdProto appId = 1;
optional bytes credentialsForApp = 2;
}
message NMContainerStatusProto {

View File

@ -18,9 +18,18 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@ -29,10 +38,10 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@ -93,4 +102,33 @@ public void testRegisterNodeManagerRequest() {
Assert.assertEquals(1, requestProto.getRunningApplications().size());
Assert.assertEquals(appId, requestProto.getRunningApplications().get(0));
}
@Test
public void testNodeHeartBeatResponse() throws IOException {
NodeHeartbeatResponse record =
Records.newRecord(NodeHeartbeatResponse.class);
Map<ApplicationId, ByteBuffer> appCredentials =
new HashMap<ApplicationId, ByteBuffer>();
Credentials app1Cred = new Credentials();
Token<DelegationTokenIdentifier> token1 =
new Token<DelegationTokenIdentifier>();
token1.setKind(new Text("kind1"));
app1Cred.addToken(new Text("token1"), token1);
Token<DelegationTokenIdentifier> token2 =
new Token<DelegationTokenIdentifier>();
token2.setKind(new Text("kind2"));
app1Cred.addToken(new Text("token2"), token2);
DataOutputBuffer dob = new DataOutputBuffer();
app1Cred.writeTokenStorageToStream(dob);
ByteBuffer byteBuffer1 = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1);
record.setSystemCredentialsForApps(appCredentials);
NodeHeartbeatResponse proto =
new NodeHeartbeatResponsePBImpl(
((NodeHeartbeatResponsePBImpl) record).getProto());
Assert.assertEquals(appCredentials, proto.getSystemCredentialsForApps());
}
}

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.yarn.server.nodemanager;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -54,6 +56,8 @@ public interface Context {
ConcurrentMap<ApplicationId, Application> getApplications();
Map<ApplicationId, Credentials> getSystemCredentialsForApps();
ConcurrentMap<ContainerId, Container> getContainers();
NMContainerTokenSecretManager getContainerTokenSecretManager();

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
@ -32,6 +34,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ReflectionUtils;
@ -313,6 +316,10 @@ public static class NMContext implements Context {
private NodeId nodeId = null;
protected final ConcurrentMap<ApplicationId, Application> applications =
new ConcurrentHashMap<ApplicationId, Application>();
private Map<ApplicationId, Credentials> systemCredentials =
new HashMap<ApplicationId, Credentials>();
protected final ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>();
@ -420,6 +427,16 @@ public boolean getDecommissioned() {
public void setDecommissioned(boolean isDecommissioned) {
this.isDecommissioned = isDecommissioned;
}
@Override
public Map<ApplicationId, Credentials> getSystemCredentialsForApps() {
return systemCredentials;
}
public void setSystemCrendentials(
Map<ApplicationId, Credentials> systemCredentials) {
this.systemCredentials = systemCredentials;
}
}

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import java.net.ConnectException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -36,7 +37,9 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.VersionUtil;
@ -62,6 +65,7 @@
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -525,6 +529,25 @@ public long getRMIdentifier() {
return this.rmIdentifier;
}
private static Map<ApplicationId, Credentials> parseCredentials(
Map<ApplicationId, ByteBuffer> systemCredentials) throws IOException {
Map<ApplicationId, Credentials> map =
new HashMap<ApplicationId, Credentials>();
for (Map.Entry<ApplicationId, ByteBuffer> entry : systemCredentials.entrySet()) {
Credentials credentials = new Credentials();
DataInputByteBuffer buf = new DataInputByteBuffer();
ByteBuffer buffer = entry.getValue();
buffer.rewind();
buf.reset(buffer);
credentials.readTokenStorageStream(buf);
map.put(entry.getKey(), credentials);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Retrieved credentials form RM: " + map);
}
return map;
}
protected void startStatusUpdater() {
statusUpdaterRunnable = new Runnable() {
@ -598,6 +621,13 @@ public void run() {
new CMgrCompletedAppsEvent(appsToCleanup,
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
}
Map<ApplicationId, ByteBuffer> systemCredentials =
response.getSystemCredentialsForApps();
if (systemCredentials != null && !systemCredentials.isEmpty()) {
((NMContext) context)
.setSystemCrendentials(parseCredentials(systemCredentials));
}
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(

View File

@ -186,7 +186,7 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
this.metrics = metrics;
rsrcLocalizationSrvc =
createResourceLocalizationService(exec, deletionContext);
createResourceLocalizationService(exec, deletionContext, context);
addService(rsrcLocalizationSrvc);
containersLauncher = createContainersLauncher(context, exec);
@ -362,9 +362,9 @@ public ContainersMonitor getContainersMonitor() {
}
protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext) {
ContainerExecutor exec, DeletionService deletionContext, Context context) {
return new ResourceLocalizationService(this.dispatcher, exec,
deletionContext, dirsHandler, context.getNMStateStore());
deletionContext, dirsHandler, context);
}
protected ContainersLauncher createContainersLauncher(Context context,

View File

@ -83,11 +83,11 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@ -158,6 +158,7 @@ public class ResourceLocalizationService extends CompositeService
private LocalResourcesTracker publicRsrc;
private LocalDirsHandlerService dirsHandler;
private Context nmContext;
/**
* Map of LocalResourceTrackers keyed by username, for private
@ -177,7 +178,7 @@ public class ResourceLocalizationService extends CompositeService
public ResourceLocalizationService(Dispatcher dispatcher,
ContainerExecutor exec, DeletionService delService,
LocalDirsHandlerService dirsHandler, NMStateStoreService stateStore) {
LocalDirsHandlerService dirsHandler, Context context) {
super(ResourceLocalizationService.class.getName());
this.exec = exec;
@ -189,7 +190,8 @@ public ResourceLocalizationService(Dispatcher dispatcher,
new ThreadFactoryBuilder()
.setNameFormat("ResourceLocalizationService Cache Cleanup")
.build());
this.stateStore = stateStore;
this.stateStore = context.getNMStateStore();
this.nmContext = context;
}
FileContext getLocalFileContext(Configuration conf) {
@ -1110,11 +1112,36 @@ public void run() {
}
}
private Credentials getSystemCredentialsSentFromRM(
LocalizerContext localizerContext) throws IOException {
ApplicationId appId =
localizerContext.getContainerId().getApplicationAttemptId()
.getApplicationId();
Credentials systemCredentials =
nmContext.getSystemCredentialsForApps().get(appId);
if (systemCredentials == null) {
return null;
}
LOG.info("Adding new framework tokens from RM for " + appId);
for (Token<?> token : systemCredentials.getAllTokens()) {
LOG.info("Adding new application-token for localization: " + token);
}
return systemCredentials;
}
private void writeCredentials(Path nmPrivateCTokensPath)
throws IOException {
DataOutputStream tokenOut = null;
try {
Credentials credentials = context.getCredentials();
if (UserGroupInformation.isSecurityEnabled()) {
Credentials systemCredentials =
getSystemCredentialsSentFromRM(context);
if (systemCredentials != null) {
credentials = systemCredentials;
}
}
FileContext lfs = getLocalFileContext(getConfig());
tokenOut =
lfs.create(nmPrivateCTokensPath, EnumSet.of(CREATE, OVERWRITE));

View File

@ -39,9 +39,10 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -60,6 +61,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -342,6 +344,18 @@ protected void initAppAggregator(final ApplicationId appId, String user,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) {
if (UserGroupInformation.isSecurityEnabled()) {
Credentials systemCredentials =
context.getSystemCredentialsForApps().get(appId);
if (systemCredentials != null) {
LOG.info("Adding new framework tokens from RM for " + appId);
for (Token<?> token : systemCredentials.getAllTokens()) {
LOG.info("Adding new application-token for log-aggregation: " + token);
}
credentials = systemCredentials;
}
}
// Get user's FileSystem credentials
final UserGroupInformation userUgi =
UserGroupInformation.createRemoteUser(user);

View File

@ -54,7 +54,6 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
public class DummyContainerManager extends ContainerManagerImpl {
@ -74,9 +73,9 @@ public DummyContainerManager(Context context, ContainerExecutor exec,
@Override
@SuppressWarnings("unchecked")
protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext) {
ContainerExecutor exec, DeletionService deletionContext, Context context) {
return new ResourceLocalizationService(super.dispatcher, exec,
deletionContext, super.dirsHandler, new NMNullStateStoreService()) {
deletionContext, super.dirsHandler, context) {
@Override
public void handle(LocalizationEvent event) {
switch (event.getType()) {

View File

@ -44,10 +44,14 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -561,6 +565,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
// Test NodeStatusUpdater sends the right container statuses each time it
// heart beats.
private Credentials expectedCredentials = new Credentials();
private class MyResourceTracker4 implements ResourceTracker {
public NodeAction registerNodeAction = NodeAction.NORMAL;
@ -576,6 +581,11 @@ private class MyResourceTracker4 implements ResourceTracker {
createContainerStatus(5, ContainerState.COMPLETE);
public MyResourceTracker4(Context context) {
// create app Credentials
org.apache.hadoop.security.token.Token<DelegationTokenIdentifier> token1 =
new org.apache.hadoop.security.token.Token<DelegationTokenIdentifier>();
token1.setKind(new Text("kind1"));
expectedCredentials.addToken(new Text("token1"), token1);
this.context = context;
}
@ -694,6 +704,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
heartBeatNodeAction, null, null, null, null, 1000L);
nhResponse.addContainersToBeRemovedFromNM(finishedContainersPulledByAM);
Map<ApplicationId, ByteBuffer> appCredentials =
new HashMap<ApplicationId, ByteBuffer>();
DataOutputBuffer dob = new DataOutputBuffer();
expectedCredentials.writeTokenStorageToStream(dob);
ByteBuffer byteBuffer1 =
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1);
nhResponse.setSystemCredentialsForApps(appCredentials);
return nhResponse;
}
}
@ -1293,6 +1311,8 @@ protected NMContext createNMContext(
if(assertionFailedInThread.get()) {
Assert.fail("ContainerStatus Backup failed");
}
Assert.assertNotNull(nm.getNMContext().getSystemCredentialsForApps()
.get(ApplicationId.newInstance(1234, 1)).getToken(new Text("token1")));
nm.stop();
}

View File

@ -278,8 +278,7 @@ private void waitForAppState(Application app, ApplicationState state)
private ContainerManagerImpl createContainerManager(Context context) {
final LogHandler logHandler = mock(LogHandler.class);
final ResourceLocalizationService rsrcSrv =
new ResourceLocalizationService(null, null, null, null,
context.getNMStateStore()) {
new ResourceLocalizationService(null, null, null, null, context) {
@Override
public void serviceInit(Configuration conf) throws Exception {
}
@ -320,7 +319,7 @@ protected LogHandler createLogHandler(Configuration conf,
@Override
protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext) {
ContainerExecutor exec, DeletionService deletionContext, Context context) {
return rsrcSrv;
}

View File

@ -23,7 +23,12 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheDirectoryManager.Directory;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.junit.Test;
public class TestLocalCacheDirectoryManager {
@ -73,8 +78,12 @@ public void testMinimumPerDirectoryFileLimit() {
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "1");
Exception e = null;
NMContext nmContext =
new NMContext(new NMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), new NMNullStateStoreService());
ResourceLocalizationService service =
new ResourceLocalizationService(null, null, null, null, null);
new ResourceLocalizationService(null, null, null, null, nmContext);
try {
service.init(conf);
} catch (Exception e1) {

View File

@ -105,6 +105,7 @@
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
@ -138,6 +139,9 @@
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
@ -159,7 +163,7 @@ public class TestResourceLocalizationService {
private Configuration conf;
private AbstractFileSystem spylfs;
private FileContext lfs;
private NMContext nmContext;
@BeforeClass
public static void setupClass() {
mockServer = mock(Server.class);
@ -174,6 +178,9 @@ public void setup() throws IOException {
String logDir = lfs.makeQualified(new Path(basedir, "logdir ")).toString();
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
nmContext = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), new NMNullStateStoreService());
}
@After
@ -206,8 +213,7 @@ public void testLocalizationInit() throws Exception {
ResourceLocalizationService locService =
spy(new ResourceLocalizationService(dispatcher, exec, delService,
diskhandler,
new NMNullStateStoreService()));
diskhandler, nmContext));
doReturn(lfs)
.when(locService).getLocalFileContext(isA(Configuration.class));
try {
@ -268,8 +274,7 @@ public void testDirectoryCleanupOnNewlyCreatedStateStore()
ResourceLocalizationService locService =
spy(new ResourceLocalizationService(dispatcher, exec, delService,
diskhandler,
nmStateStoreService));
diskhandler,nmContext));
doReturn(lfs)
.when(locService).getLocalFileContext(isA(Configuration.class));
try {
@ -340,8 +345,7 @@ public void testResourceRelease() throws Exception {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler,
new NMNullStateStoreService());
dirsHandler, nmContext);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
@ -751,8 +755,7 @@ public void testLocalizationHeartbeat() throws Exception {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler,
new NMNullStateStoreService());
dirsHandler, nmContext);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
@ -965,8 +968,7 @@ public void testFailedPublicResource() throws Exception {
try {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler,
new NMNullStateStoreService());
dirsHandler, nmContext);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(
@ -1075,7 +1077,7 @@ public void testPublicResourceAddResourceExceptions() throws Exception {
try {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandlerSpy, new NMNullStateStoreService());
dirsHandlerSpy, nmContext);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(
@ -1188,7 +1190,7 @@ public void testParallelDownloadAttemptsForPrivateResource() throws Exception {
ResourceLocalizationService rls =
new ResourceLocalizationService(dispatcher1, exec, delService,
localDirHandler, new NMNullStateStoreService());
localDirHandler, nmContext);
dispatcher1.register(LocalizationEventType.class, rls);
rls.init(conf);
@ -1341,7 +1343,7 @@ public void testLocalResourcePath() throws Exception {
ResourceLocalizationService rls =
new ResourceLocalizationService(dispatcher1, exec, delService,
localDirHandler, new NMNullStateStoreService());
localDirHandler, nmContext);
dispatcher1.register(LocalizationEventType.class, rls);
rls.init(conf);
@ -1507,7 +1509,7 @@ public void testParallelDownloadAttemptsForPublicResource() throws Exception {
// it as otherwise it will remove requests from pending queue.
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher1, exec, delService,
dirsHandler, new NMNullStateStoreService());
dirsHandler, nmContext);
ResourceLocalizationService spyService = spy(rawService);
dispatcher1.register(LocalizationEventType.class, spyService);
spyService.init(conf);
@ -1795,9 +1797,13 @@ private ResourceLocalizationService createSpyService(
ContainerExecutor exec = mock(ContainerExecutor.class);
LocalizerTracker mockLocalizerTracker = mock(LocalizerTracker.class);
DeletionService delService = mock(DeletionService.class);
NMContext nmContext =
new NMContext(new NMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), stateStore);
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler, stateStore);
dirsHandler, nmContext);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker(
@ -1861,7 +1867,7 @@ public void testFailedDirsResourceRelease() throws Exception {
// setup mocks
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
mockDirsHandler, new NMNullStateStoreService());
mockDirsHandler, nmContext);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(

View File

@ -278,7 +278,8 @@ protected void submitApplication(
try {
credentials = parseCredentials(submissionContext);
this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
credentials, submissionContext.getCancelTokensWhenComplete());
credentials, submissionContext.getCancelTokensWhenComplete(),
application.getUser());
} catch (Exception e) {
LOG.warn("Unable to parse credentials.", e);
// Sending APP_REJECTED is fine, since we assume that the
@ -325,7 +326,8 @@ protected void submitApplication(
credentials = parseCredentials(appContext);
// synchronously renew delegation token on recovery.
rmContext.getDelegationTokenRenewer().addApplicationSync(appId,
credentials, appContext.getCancelTokensWhenComplete());
credentials, appContext.getCancelTokensWhenComplete(),
application.getUser());
application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
} catch (Exception e) {
LOG.warn("Unable to parse and renew delegation tokens.", e);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@ -57,6 +58,8 @@ public interface RMContext {
ConcurrentMap<ApplicationId, RMApp> getRMApps();
ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps();
ConcurrentMap<String, RMNode> getInactiveRMNodes();
ConcurrentMap<NodeId, RMNode> getRMNodes();

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -67,6 +68,9 @@ public class RMContextImpl implements RMContext {
private final ConcurrentMap<String, RMNode> inactiveNodes
= new ConcurrentHashMap<String, RMNode>();
private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
new ConcurrentHashMap<ApplicationId, ByteBuffer>();
private boolean isHAEnabled;
private boolean isWorkPreservingRecoveryEnabled;
private HAServiceState haServiceState =
@ -444,4 +448,8 @@ public boolean isSchedulerReadyForAllocatingContainers() {
public void setSystemClock(Clock clock) {
this.systemClock = clock;
}
public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
return systemCredentials;
}
}

View File

@ -20,6 +20,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -31,6 +33,7 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -387,7 +390,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse
.getResponseId()) {
LOG.info("Received duplicate heartbeat from node "
+ rmNode.getNodeAddress());
+ rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId());
return lastNodeHeartbeatResponse;
} else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse
.getResponseId()) {
@ -412,6 +415,12 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
populateKeys(request, nodeHeartBeatResponse);
ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
rmContext.getSystemCredentialsForApps();
if (!systemCredentials.isEmpty()) {
nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
}
// 4. Send status to RMNode, saving the latest response.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.security;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
@ -45,14 +47,20 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -82,12 +90,10 @@ public class DelegationTokenRenewer extends AbstractService {
private DelegationTokenCancelThread dtCancelThread =
new DelegationTokenCancelThread();
private ThreadPoolExecutor renewerService;
// managing the list of tokens using Map
// appId=>List<tokens>
private Set<DelegationTokenToRenew> delegationTokens =
Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
private ConcurrentMap<ApplicationId, Set<DelegationTokenToRenew>> appTokens =
new ConcurrentHashMap<ApplicationId, Set<DelegationTokenToRenew>>();
private final ConcurrentMap<ApplicationId, Long> delayedRemovalMap =
new ConcurrentHashMap<ApplicationId, Long>();
@ -99,20 +105,33 @@ public class DelegationTokenRenewer extends AbstractService {
private LinkedBlockingQueue<DelegationTokenRenewerEvent> pendingEventQueue;
private boolean tokenKeepAliveEnabled;
private boolean hasProxyUserPrivileges;
private long credentialsValidTimeRemaining;
// this config is supposedly not used by end-users.
public static final String RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING =
YarnConfiguration.RM_PREFIX + "system-credentials.valid-time-remaining";
public static final long DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING =
10800000; // 3h
public DelegationTokenRenewer() {
super(DelegationTokenRenewer.class.getName());
}
@Override
protected synchronized void serviceInit(Configuration conf) throws Exception {
protected void serviceInit(Configuration conf) throws Exception {
this.hasProxyUserPrivileges =
conf.getBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
YarnConfiguration.DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED);
this.tokenKeepAliveEnabled =
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
this.credentialsValidTimeRemaining =
conf.getLong(RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING,
DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING);
setLocalSecretManagerAndServiceAddr();
renewerService = createNewThreadPoolService(conf);
pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
@ -182,7 +201,7 @@ protected void serviceStop() {
if (renewalTimer != null) {
renewalTimer.cancel();
}
delegationTokens.clear();
appTokens.clear();
this.renewerService.shutdown();
dtCancelThread.interrupt();
try {
@ -212,22 +231,28 @@ protected static class DelegationTokenToRenew {
public long expirationDate;
public TimerTask timerTask;
public final boolean shouldCancelAtEnd;
public DelegationTokenToRenew(
ApplicationId jId, Token<?> token,
Configuration conf, long expirationDate, boolean shouldCancelAtEnd) {
public long maxDate;
public String user;
public DelegationTokenToRenew(ApplicationId jId, Token<?> token,
Configuration conf, long expirationDate, boolean shouldCancelAtEnd,
String user) {
this.token = token;
this.user = user;
if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
try {
AbstractDelegationTokenIdentifier identifier =
(AbstractDelegationTokenIdentifier) token.decodeIdentifier();
maxDate = identifier.getMaxDate();
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}
this.applicationId = jId;
this.conf = conf;
this.expirationDate = expirationDate;
this.timerTask = null;
this.shouldCancelAtEnd = shouldCancelAtEnd;
if (this.token==null || this.applicationId==null || this.conf==null) {
throw new IllegalArgumentException("Invalid params to renew token" +
";token=" + this.token +
";appId=" + this.applicationId +
";conf=" + this.conf);
}
}
public void setTimerTask(TimerTask tTask) {
@ -317,16 +342,14 @@ public Void run() throws Exception {
}
}
}
//adding token
private void addTokenToList(DelegationTokenToRenew t) {
delegationTokens.add(t);
}
@VisibleForTesting
public Set<Token<?>> getDelegationTokens() {
Set<Token<?>> tokens = new HashSet<Token<?>>();
for(DelegationTokenToRenew delegationToken : delegationTokens) {
tokens.add(delegationToken.token);
for (Set<DelegationTokenToRenew> tokenList : appTokens.values()) {
for (DelegationTokenToRenew token : tokenList) {
tokens.add(token.token);
}
}
return tokens;
}
@ -337,25 +360,28 @@ public Set<Token<?>> getDelegationTokens() {
* @param ts tokens
* @param shouldCancelAtEnd true if tokens should be canceled when the app is
* done else false.
* @param user user
* @throws IOException
*/
public void addApplicationAsync(ApplicationId applicationId, Credentials ts,
boolean shouldCancelAtEnd) {
boolean shouldCancelAtEnd, String user) {
processDelegationTokenRenewerEvent(new DelegationTokenRenewerAppSubmitEvent(
applicationId, ts, shouldCancelAtEnd));
applicationId, ts, shouldCancelAtEnd, user));
}
/**
* Synchronously renew delegation tokens.
* @param user user
*/
public void addApplicationSync(ApplicationId applicationId, Credentials ts,
boolean shouldCancelAtEnd) throws IOException{
boolean shouldCancelAtEnd, String user) throws IOException,
InterruptedException {
handleAppSubmitEvent(new DelegationTokenRenewerAppSubmitEvent(
applicationId, ts, shouldCancelAtEnd));
applicationId, ts, shouldCancelAtEnd, user));
}
private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
throws IOException {
throws IOException, InterruptedException {
ApplicationId applicationId = evt.getApplicationId();
Credentials ts = evt.getCredentials();
boolean shouldCancelAtEnd = evt.shouldCancelAtEnd();
@ -375,14 +401,21 @@ private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
// all renewable tokens are valid
// At RM restart it is safe to assume that all the previously added tokens
// are valid
List<DelegationTokenToRenew> tokenList =
new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
appTokens.put(applicationId,
Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>()));
Set<DelegationTokenToRenew> tokenList = new HashSet<DelegationTokenToRenew>();
boolean hasHdfsToken = false;
for (Token<?> token : tokens) {
if (token.isManaged()) {
tokenList.add(new DelegationTokenToRenew(applicationId,
token, getConfig(), now, shouldCancelAtEnd));
token, getConfig(), now, shouldCancelAtEnd, evt.getUser()));
if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
LOG.info(applicationId + " found existing hdfs token " + token);
hasHdfsToken = true;
}
}
}
if (!tokenList.isEmpty()) {
// Renewing token and adding it to timer calls are separated purposefully
// If user provides incorrect token then it should not be added for
@ -395,14 +428,15 @@ private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
}
}
for (DelegationTokenToRenew dtr : tokenList) {
addTokenToList(dtr);
appTokens.get(applicationId).add(dtr);
setTimerForTokenRenewal(dtr);
if (LOG.isDebugEnabled()) {
LOG.debug("Registering token for renewal for:" + " service = "
+ dtr.token.getService() + " for appId = " + dtr.applicationId);
}
}
}
if (!hasHdfsToken) {
requestNewHdfsDelegationToken(applicationId, evt.getUser(),
shouldCancelAtEnd);
}
}
/**
@ -424,14 +458,16 @@ public synchronized void run() {
}
Token<?> token = dttr.token;
try {
renewToken(dttr);
if (LOG.isDebugEnabled()) {
LOG.debug("Renewing delegation-token for:" + token.getService() +
"; new expiration;" + dttr.expirationDate);
requestNewHdfsDelegationTokenIfNeeded(dttr);
// if the token is not replaced by a new token, renew the token
if (appTokens.get(dttr.applicationId).contains(dttr)) {
renewToken(dttr);
setTimerForTokenRenewal(dttr);// set the next one
} else {
LOG.info("The token was removed already. Token = [" +dttr +"]");
}
setTimerForTokenRenewal(dttr);// set the next one
} catch (Exception e) {
LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
removeFailedDelegationToken(dttr);
@ -455,12 +491,14 @@ protected void setTimerForTokenRenewal(DelegationTokenToRenew token)
// calculate timer time
long expiresIn = token.expirationDate - System.currentTimeMillis();
long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration
// need to create new task every time
TimerTask tTask = new RenewalTimerTask(token);
token.setTimerTask(tTask); // keep reference to the timer
renewalTimer.schedule(token.timerTask, new Date(renewIn));
LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = "
+ token.applicationId);
}
// renew a token
@ -470,16 +508,99 @@ protected void renewToken(final DelegationTokenToRenew dttr)
// need to use doAs so that http can find the kerberos tgt
// NOTE: token renewers should be responsible for the correct UGI!
try {
dttr.expirationDate = UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Long>(){
@Override
public Long run() throws Exception {
return dttr.token.renew(dttr.conf);
}
});
dttr.expirationDate =
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws Exception {
return dttr.token.renew(dttr.conf);
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
LOG.info("Renewed delegation-token= [" + dttr + "], for "
+ dttr.applicationId);
}
// Request new hdfs token if the token is about to expire, and remove the old
// token from the tokenToRenew list
private void requestNewHdfsDelegationTokenIfNeeded(
final DelegationTokenToRenew dttr) throws IOException,
InterruptedException {
if (hasProxyUserPrivileges
&& dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining
&& dttr.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
// remove all old expiring hdfs tokens for this application.
Set<DelegationTokenToRenew> tokenSet = appTokens.get(dttr.applicationId);
if (tokenSet != null && !tokenSet.isEmpty()) {
Iterator<DelegationTokenToRenew> iter = tokenSet.iterator();
synchronized (tokenSet) {
while (iter.hasNext()) {
DelegationTokenToRenew t = iter.next();
if (t.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
iter.remove();
if (t.timerTask != null) {
t.timerTask.cancel();
}
LOG.info("Removed expiring token " + t);
}
}
}
}
LOG.info("Token= (" + dttr + ") is expiring, request new token.");
requestNewHdfsDelegationToken(dttr.applicationId, dttr.user,
dttr.shouldCancelAtEnd);
}
}
private void requestNewHdfsDelegationToken(ApplicationId applicationId,
String user, boolean shouldCancelAtEnd) throws IOException,
InterruptedException {
// Get new hdfs tokens for this user
Credentials credentials = new Credentials();
Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials);
// Add new tokens to the toRenew list.
LOG.info("Received new tokens for " + applicationId + ". Received "
+ newTokens.length + " tokens.");
if (newTokens.length > 0) {
for (Token<?> token : newTokens) {
if (token.isManaged()) {
DelegationTokenToRenew tokenToRenew =
new DelegationTokenToRenew(applicationId, token, getConfig(),
Time.now(), shouldCancelAtEnd, user);
// renew the token to get the next expiration date.
renewToken(tokenToRenew);
setTimerForTokenRenewal(tokenToRenew);
appTokens.get(applicationId).add(tokenToRenew);
LOG.info("Received new token " + token);
}
}
}
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
}
protected Token<?>[] obtainSystemTokensForUser(String user,
final Credentials credentials) throws IOException, InterruptedException {
// Get new hdfs tokens on behalf of this user
UserGroupInformation proxyUser =
UserGroupInformation.createProxyUser(user,
UserGroupInformation.getLoginUser());
Token<?>[] newTokens =
proxyUser.doAs(new PrivilegedExceptionAction<Token<?>[]>() {
@Override
public Token<?>[] run() throws Exception {
return FileSystem.get(getConfig()).addDelegationTokens(
UserGroupInformation.getLoginUser().getUserName(), credentials);
}
});
return newTokens;
}
// cancel a token
@ -497,13 +618,13 @@ private void cancelToken(DelegationTokenToRenew t) {
*/
private void removeFailedDelegationToken(DelegationTokenToRenew t) {
ApplicationId applicationId = t.applicationId;
if (LOG.isDebugEnabled())
LOG.debug("removing failed delegation token for appid=" + applicationId +
";t=" + t.token.getService());
delegationTokens.remove(t);
LOG.error("removing failed delegation token for appid=" + applicationId
+ ";t=" + t.token.getService());
appTokens.get(applicationId).remove(t);
// cancel the timer
if(t.timerTask!=null)
if (t.timerTask != null) {
t.timerTask.cancel();
}
}
/**
@ -543,18 +664,21 @@ public void updateKeepAliveApplications(List<ApplicationId> appIds) {
}
private void removeApplicationFromRenewal(ApplicationId applicationId) {
synchronized (delegationTokens) {
Iterator<DelegationTokenToRenew> it = delegationTokens.iterator();
while(it.hasNext()) {
DelegationTokenToRenew dttr = it.next();
if (dttr.applicationId.equals(applicationId)) {
rmContext.getSystemCredentialsForApps().remove(applicationId);
Set<DelegationTokenToRenew> tokens = appTokens.get(applicationId);
if (tokens != null && !tokens.isEmpty()) {
synchronized (tokens) {
Iterator<DelegationTokenToRenew> it = tokens.iterator();
while (it.hasNext()) {
DelegationTokenToRenew dttr = it.next();
if (LOG.isDebugEnabled()) {
LOG.debug("Removing delegation token for appId=" + applicationId +
"; token=" + dttr.token.getService());
LOG.debug("Removing delegation token for appId=" + applicationId
+ "; token=" + dttr.token.getService());
}
// cancel the timer
if(dttr.timerTask!=null)
if (dttr.timerTask != null)
dttr.timerTask.cancel();
// cancel the token
@ -670,17 +794,19 @@ private void handleDTRenewerAppSubmitEvent(
}
}
private static class DelegationTokenRenewerAppSubmitEvent extends
static class DelegationTokenRenewerAppSubmitEvent extends
DelegationTokenRenewerEvent {
private Credentials credentials;
private boolean shouldCancelAtEnd;
private String user;
public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
Credentials credentails, boolean shouldCancelAtEnd) {
Credentials credentails, boolean shouldCancelAtEnd, String user) {
super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
this.credentials = credentails;
this.shouldCancelAtEnd = shouldCancelAtEnd;
this.user = user;
}
public Credentials getCredentials() {
@ -690,6 +816,10 @@ public Credentials getCredentials() {
public boolean shouldCancelAtEnd() {
return shouldCancelAtEnd;
}
public String getUser() {
return user;
}
}
enum DelegationTokenRenewerEventType {

View File

@ -38,6 +38,7 @@
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@ -54,6 +55,7 @@
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@ -61,6 +63,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@ -74,11 +77,16 @@
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
@ -88,16 +96,18 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Supplier;
/**
* unit test -
* tests addition/deletion/cancellation of renewals of delegation tokens
*
*/
@SuppressWarnings("rawtypes")
@SuppressWarnings({"rawtypes", "unchecked"})
public class TestDelegationTokenRenewer {
private static final Log LOG =
LogFactory.getLog(TestDelegationTokenRenewer.class);
private static final Text KIND = new Text("TestDelegationTokenRenewer.Token");
private static final Text KIND = new Text("HDFS_DELEGATION_TOKEN");
private static BlockingQueue<Event> eventQueue;
private static volatile AtomicInteger counter;
@ -125,6 +135,9 @@ public boolean isManaged(Token<?> token) throws IOException {
@Override
public long renew(Token<?> t, Configuration conf) throws IOException {
if ( !(t instanceof MyToken)) {
return DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
}
MyToken token = (MyToken)t;
if(token.isCanceled()) {
throw new InvalidToken("token has been canceled");
@ -179,8 +192,10 @@ public void setUp() throws Exception {
dispatcher = new AsyncDispatcher(eventQueue);
Renewer.reset();
delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter);
RMContext mockContext = mock(RMContext.class);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getSystemCredentialsForApps()).thenReturn(
new ConcurrentHashMap<ApplicationId, ByteBuffer>());
when(mockContext.getDelegationTokenRenewer()).thenReturn(
delegationTokenRenewer);
when(mockContext.getDispatcher()).thenReturn(dispatcher);
@ -290,9 +305,9 @@ static MyToken createTokens(Text renewer)
Text user1= new Text("user1");
MyDelegationTokenSecretManager sm = new MyDelegationTokenSecretManager(
DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT,
3600000, null);
sm.startThreads();
@ -353,7 +368,7 @@ public void testDTRenewal () throws Exception {
// register the tokens for renewal
ApplicationId applicationId_0 =
BuilderUtils.newApplicationId(0, 0);
delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true);
delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true, "user");
waitForEventsToGetProcessed(delegationTokenRenewer);
// first 3 initial renewals + 1 real
@ -393,7 +408,7 @@ public void testDTRenewal () throws Exception {
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true);
delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true, "user");
waitForEventsToGetProcessed(delegationTokenRenewer);
delegationTokenRenewer.applicationFinished(applicationId_1);
waitForEventsToGetProcessed(delegationTokenRenewer);
@ -429,7 +444,7 @@ public void testAppRejectionWithCancelledDelegationToken() throws Exception {
// register the tokens for renewal
ApplicationId appId = BuilderUtils.newApplicationId(0, 0);
delegationTokenRenewer.addApplicationAsync(appId, ts, true);
delegationTokenRenewer.addApplicationAsync(appId, ts, true, "user");
int waitCnt = 20;
while (waitCnt-- >0) {
if (!eventQueue.isEmpty()) {
@ -473,7 +488,7 @@ public void testDTRenewalWithNoCancel () throws Exception {
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false);
delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false, "user");
waitForEventsToGetProcessed(delegationTokenRenewer);
delegationTokenRenewer.applicationFinished(applicationId_1);
waitForEventsToGetProcessed(delegationTokenRenewer);
@ -516,6 +531,8 @@ public void testDTKeepAlive1 () throws Exception {
DelegationTokenRenewer localDtr =
createNewDelegationTokenRenewer(lconf, counter);
RMContext mockContext = mock(RMContext.class);
when(mockContext.getSystemCredentialsForApps()).thenReturn(
new ConcurrentHashMap<ApplicationId, ByteBuffer>());
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
when(mockContext.getDelegationTokenRenewer()).thenReturn(
@ -540,7 +557,7 @@ public void testDTKeepAlive1 () throws Exception {
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
localDtr.addApplicationAsync(applicationId_0, ts, true);
localDtr.addApplicationAsync(applicationId_0, ts, true, "user");
waitForEventsToGetProcessed(localDtr);
if (!eventQueue.isEmpty()){
Event evt = eventQueue.take();
@ -593,6 +610,8 @@ public void testDTKeepAlive2() throws Exception {
DelegationTokenRenewer localDtr =
createNewDelegationTokenRenewer(conf, counter);
RMContext mockContext = mock(RMContext.class);
when(mockContext.getSystemCredentialsForApps()).thenReturn(
new ConcurrentHashMap<ApplicationId, ByteBuffer>());
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
when(mockContext.getDelegationTokenRenewer()).thenReturn(
@ -617,7 +636,7 @@ public void testDTKeepAlive2() throws Exception {
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
localDtr.addApplicationAsync(applicationId_0, ts, true);
localDtr.addApplicationAsync(applicationId_0, ts, true, "user");
localDtr.applicationFinished(applicationId_0);
waitForEventsToGetProcessed(delegationTokenRenewer);
//Send another keep alive.
@ -640,7 +659,7 @@ public void testDTKeepAlive2() throws Exception {
private DelegationTokenRenewer createNewDelegationTokenRenewer(
Configuration conf, final AtomicInteger counter) {
return new DelegationTokenRenewer() {
DelegationTokenRenewer renew = new DelegationTokenRenewer() {
@Override
protected ThreadPoolExecutor
@ -664,6 +683,8 @@ public void execute(Runnable command) {
return pool;
}
};
renew.setRMContext(TestUtils.getMockRMContext());
return renew;
}
private void waitForEventsToGetProcessed(DelegationTokenRenewer dtr)
@ -679,7 +700,12 @@ private void waitForEventsToGetProcessed(DelegationTokenRenewer dtr)
public void testDTRonAppSubmission()
throws IOException, InterruptedException, BrokenBarrierException {
final Credentials credsx = new Credentials();
final Token<?> tokenx = mock(Token.class);
final Token<DelegationTokenIdentifier> tokenx = mock(Token.class);
when(tokenx.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
DelegationTokenIdentifier dtId1 =
new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"),
new Text("user1"));
when(tokenx.decodeIdentifier()).thenReturn(dtId1);
credsx.addToken(new Text("token"), tokenx);
doReturn(true).when(tokenx).isManaged();
doThrow(new IOException("boom"))
@ -688,6 +714,8 @@ public void testDTRonAppSubmission()
final DelegationTokenRenewer dtr =
createNewDelegationTokenRenewer(conf, counter);
RMContext mockContext = mock(RMContext.class);
when(mockContext.getSystemCredentialsForApps()).thenReturn(
new ConcurrentHashMap<ApplicationId, ByteBuffer>());
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
InetSocketAddress sockAddr =
@ -699,7 +727,7 @@ public void testDTRonAppSubmission()
dtr.start();
try {
dtr.addApplicationSync(mock(ApplicationId.class), credsx, false);
dtr.addApplicationSync(mock(ApplicationId.class), credsx, false, "user");
fail("Catch IOException on app submission");
} catch (IOException e){
Assert.assertTrue(e.getMessage().contains(tokenx.toString()));
@ -716,7 +744,12 @@ public void testConcurrentAddApplication()
// this token uses barriers to block during renew
final Credentials creds1 = new Credentials();
final Token<?> token1 = mock(Token.class);
final Token<DelegationTokenIdentifier> token1 = mock(Token.class);
when(token1.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
DelegationTokenIdentifier dtId1 =
new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"),
new Text("user1"));
when(token1.decodeIdentifier()).thenReturn(dtId1);
creds1.addToken(new Text("token"), token1);
doReturn(true).when(token1).isManaged();
doAnswer(new Answer<Long>() {
@ -729,7 +762,9 @@ public Long answer(InvocationOnMock invocation)
// this dummy token fakes renewing
final Credentials creds2 = new Credentials();
final Token<?> token2 = mock(Token.class);
final Token<DelegationTokenIdentifier> token2 = mock(Token.class);
when(token2.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
when(token2.decodeIdentifier()).thenReturn(dtId1);
creds2.addToken(new Text("token"), token2);
doReturn(true).when(token2).isManaged();
doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
@ -737,7 +772,9 @@ public Long answer(InvocationOnMock invocation)
// fire up the renewer
final DelegationTokenRenewer dtr =
createNewDelegationTokenRenewer(conf, counter);
RMContext mockContext = mock(RMContext.class);
RMContext mockContext = mock(RMContext.class);
when(mockContext.getSystemCredentialsForApps()).thenReturn(
new ConcurrentHashMap<ApplicationId, ByteBuffer>());
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
InetSocketAddress sockAddr =
@ -751,14 +788,14 @@ public Long answer(InvocationOnMock invocation)
Thread submitThread = new Thread() {
@Override
public void run() {
dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false);
dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false, "user");
}
};
submitThread.start();
// wait till 1st submit blocks, then submit another
startBarrier.await();
dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false);
dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false, "user");
// signal 1st to complete
endBarrier.await();
submitThread.join();
@ -793,4 +830,139 @@ public void testAppSubmissionWithInvalidDelegationToken() throws Exception {
"Bad header found in token storage"));
}
}
@Test (timeout = 20000)
public void testReplaceExpiringDelegationToken() throws Exception {
conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
// create Token1:
Text userText1 = new Text("user1");
DelegationTokenIdentifier dtId1 =
new DelegationTokenIdentifier(userText1, new Text("renewer1"),
userText1);
// set max date to 0 to simulate an expiring token;
dtId1.setMaxDate(0);
final Token<DelegationTokenIdentifier> token1 =
new Token<DelegationTokenIdentifier>(dtId1.getBytes(),
"password1".getBytes(), dtId1.getKind(), new Text("service1"));
// create token2
Text userText2 = new Text("user2");
DelegationTokenIdentifier dtId2 =
new DelegationTokenIdentifier(userText1, new Text("renewer2"),
userText2);
final Token<DelegationTokenIdentifier> expectedToken =
new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
"password2".getBytes(), dtId2.getKind(), new Text("service2"));
final MockRM rm = new TestSecurityMockRM(conf, null) {
@Override
protected DelegationTokenRenewer createDelegationTokenRenewer() {
return new DelegationTokenRenewer() {
@Override
protected Token<?>[] obtainSystemTokensForUser(String user,
final Credentials credentials) throws IOException {
credentials.addToken(expectedToken.getService(), expectedToken);
return new Token<?>[] { expectedToken };
}
};
}
};
rm.start();
Credentials credentials = new Credentials();
credentials.addToken(userText1, token1);
RMApp app =
rm.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", 1,
credentials);
// wait for the initial expiring hdfs token to be removed.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
public Boolean get() {
return !rm.getRMContext().getDelegationTokenRenewer()
.getDelegationTokens().contains(token1);
}
}, 1000, 20000);
// wait for the new retrieved hdfs token.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
public Boolean get() {
return rm.getRMContext().getDelegationTokenRenewer()
.getDelegationTokens().contains(expectedToken);
}
}, 1000, 20000);
// check nm can retrieve the token
final MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
nm1.registerNode();
NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
ByteBuffer tokenBuffer =
response.getSystemCredentialsForApps().get(app.getApplicationId());
Assert.assertNotNull(tokenBuffer);
Credentials appCredentials = new Credentials();
DataInputByteBuffer buf = new DataInputByteBuffer();
tokenBuffer.rewind();
buf.reset(tokenBuffer);
appCredentials.readTokenStorageStream(buf);
Assert.assertTrue(appCredentials.getAllTokens().contains(expectedToken));
}
// YARN will get the token for the app submitted without the delegation token.
@Test
public void testAppSubmissionWithoutDelegationToken() throws Exception {
// create token2
Text userText2 = new Text("user2");
DelegationTokenIdentifier dtId2 =
new DelegationTokenIdentifier(new Text("user2"), new Text("renewer2"),
userText2);
final Token<DelegationTokenIdentifier> token2 =
new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
"password2".getBytes(), dtId2.getKind(), new Text("service2"));
final MockRM rm = new TestSecurityMockRM(conf, null) {
@Override
protected DelegationTokenRenewer createDelegationTokenRenewer() {
return new DelegationTokenRenewer() {
@Override
protected Token<?>[] obtainSystemTokensForUser(String user,
final Credentials credentials) throws IOException {
credentials.addToken(token2.getService(), token2);
return new Token<?>[] { token2 };
}
};
}
};
rm.start();
// submit an app without delegationToken
RMApp app = rm.submitApp(200);
// wait for the new retrieved hdfs token.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
public Boolean get() {
return rm.getRMContext().getDelegationTokenRenewer()
.getDelegationTokens().contains(token2);
}
}, 1000, 20000);
// check nm can retrieve the token
final MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
nm1.registerNode();
NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
ByteBuffer tokenBuffer =
response.getSystemCredentialsForApps().get(app.getApplicationId());
Assert.assertNotNull(tokenBuffer);
Credentials appCredentials = new Credentials();
DataInputByteBuffer buf = new DataInputByteBuffer();
tokenBuffer.rewind();
buf.reset(tokenBuffer);
appCredentials.readTokenStorageStream(buf);
Assert.assertTrue(appCredentials.getAllTokens().contains(token2));
}
}